You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC

[15/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
new file mode 100644
index 0000000..24b8cc3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -0,0 +1,1243 @@
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+
+
+public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
+{
+  enum RetCode
+  {
+    OK, NODE_EXISTS, ERROR
+  }
+
+  private static Logger  LOG = Logger.getLogger(ZkBaseDataAccessor.class);
+
+  private final ZkClient _zkClient;
+
+  public ZkBaseDataAccessor(ZkClient zkClient)
+  {
+    _zkClient = zkClient;
+  }
+
+  /**
+   * sync create
+   */
+  @Override
+  public boolean create(String path, T record, int options)
+  {
+    return create(path, record, null, options) == RetCode.OK;
+  }
+
+  /**
+   * sync create
+   */
+  public RetCode create(String path, T record, List<String> pathCreated, int options)
+  {
+    CreateMode mode = AccessOption.getMode(options);
+    if (mode == null)
+    {
+      LOG.error("Invalid create mode. options: " + options);
+      return RetCode.ERROR;
+    }
+
+    boolean retry;
+    do
+    {
+      retry = false;
+      try
+      {
+        _zkClient.create(path, record, mode);
+        if (pathCreated != null)
+          pathCreated.add(path);
+
+        return RetCode.OK;
+      }
+      catch (ZkNoNodeException e)
+      {
+        // this will happen if parent node does not exist
+        String parentPath = new File(path).getParent();
+        try
+        {
+          RetCode rc = create(parentPath, null, pathCreated, AccessOption.PERSISTENT);
+          if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
+          {
+            // if parent node created/exists, retry
+            retry = true;
+          }
+        }
+        catch (Exception e1)
+        {
+          LOG.error("Exception while creating path: " + parentPath, e1);
+          return RetCode.ERROR;
+        }
+      }
+      catch (ZkNodeExistsException e)
+      {
+        LOG.warn("Node already exists. path: " + path);
+        return RetCode.NODE_EXISTS;
+      }
+      catch (Exception e)
+      {
+        LOG.error("Exception while creating path: " + path, e);
+        return RetCode.ERROR;
+      }
+    }
+    while (retry);
+
+    return RetCode.OK;
+  }
+
+  /**
+   * sync set
+   */
+  @Override
+  public boolean set(String path, T record, int options)
+  {
+    return set(path, record, null, null, -1, options);
+  }
+
+  /**
+   * sync set
+   * 
+   * @param setstat
+   *          : if node is created instead of set, stat will NOT be set
+   */
+  public boolean set(String path,
+                     T record,
+                     List<String> pathsCreated,
+                     Stat setstat,
+                     int expectVersion,
+                     int options)
+  {
+    CreateMode mode = AccessOption.getMode(options);
+    if (mode == null)
+    {
+      LOG.error("Invalid set mode. options: " + options);
+      return false;
+    }
+
+    boolean retry;
+    do
+    {
+      retry = false;
+      try
+      {
+        // _zkClient.writeData(path, record);
+        Stat setStat = _zkClient.writeDataGetStat(path, record, expectVersion);
+        if (setstat != null)
+          DataTree.copyStat(setStat, setstat);
+      }
+      catch (ZkNoNodeException e)
+      {
+        // node not exists, try create. in this case, stat will not be set
+        try
+        {
+          RetCode rc = create(path, record, pathsCreated, options);
+          // if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
+          // retry = true;
+          switch (rc)
+          {
+          case OK:
+            // not set stat if node is created (instead of set)
+            break;
+          case NODE_EXISTS:
+            retry = true;
+            break;
+          default:
+            LOG.error("Fail to set path by creating: " + path);
+            return false;
+          }
+        }
+        catch (Exception e1)
+        {
+          LOG.error("Exception while setting path by creating: " + path, e);
+          return false;
+        }
+      }
+      catch (ZkBadVersionException e)
+      {
+        throw e;
+      }
+      catch (Exception e)
+      {
+        LOG.error("Exception while setting path: " + path, e);
+        return false;
+      }
+    }
+    while (retry);
+
+    return true;
+  }
+
+  /**
+   * sync update
+   */
+  @Override
+  public boolean update(String path, DataUpdater<T> updater, int options)
+  {
+    return update(path, updater, null, null, options) != null;
+  }
+
+  /**
+   * sync update
+   * 
+   * @return: updatedData on success, or null on fail
+   */
+  public T update(String path,
+                  DataUpdater<T> updater,
+                  List<String> createPaths,
+                  Stat stat,
+                  int options)
+  {
+    CreateMode mode = AccessOption.getMode(options);
+    if (mode == null)
+    {
+      LOG.error("Invalid update mode. options: " + options);
+      return null;
+    }
+
+    boolean retry;
+    T updatedData = null;
+    do
+    {
+      retry = false;
+      try
+      {
+        Stat readStat = new Stat();
+        T oldData = (T) _zkClient.readData(path, readStat);
+        T newData = updater.update(oldData);
+        Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
+        if (stat != null)
+        {
+          DataTree.copyStat(setStat, stat);
+        }
+
+        updatedData = newData;
+      }
+      catch (ZkBadVersionException e)
+      {
+        retry = true;
+      }
+      catch (ZkNoNodeException e)
+      {
+        // node not exist, try create
+        try
+        {
+          T newData = updater.update(null);
+          RetCode rc = create(path, newData, createPaths, options);
+          switch (rc)
+          {
+          case OK:
+            updatedData = newData;
+            break;
+          case NODE_EXISTS:
+            retry = true;
+            break;
+          default:
+            LOG.error("Fail to update path by creating: " + path);
+            return null;
+          }
+        }
+        catch (Exception e1)
+        {
+          LOG.error("Exception while updating path by creating: " + path, e1);
+          return null;
+        }
+      }
+      catch (Exception e)
+      {
+        LOG.error("Exception while updating path: " + path, e);
+        return null;
+      }
+    }
+    while (retry);
+
+    return updatedData;
+  }
+
+  /**
+   * sync get
+   * 
+   */
+  @Override
+  public T get(String path, Stat stat, int options)
+  {
+    T data = null;
+    try
+    {
+      data = (T) _zkClient.readData(path, stat);
+    }
+    catch (ZkNoNodeException e)
+    {
+      if (AccessOption.isThrowExceptionIfNotExist(options))
+      {
+        throw e;
+      }
+    }
+    return data;
+  }
+
+  /**
+   * async get
+   * 
+   */
+  @Override
+  public List<T> get(List<String> paths, List<Stat> stats, int options)
+  {
+    boolean[] needRead = new boolean[paths.size()];
+    Arrays.fill(needRead, true);
+
+    return get(paths, stats, needRead);
+  }
+
+  /**
+   * async get
+   */
+  List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead)
+  {
+    if (paths == null || paths.size() == 0)
+    {
+      return Collections.emptyList();
+    }
+
+    // init stats
+    if (stats != null)
+    {
+      stats.clear();
+      stats.addAll(Collections.<Stat> nCopies(paths.size(), null));
+    }
+
+    long startT = System.nanoTime();
+
+    try
+    {
+      // issue asyn get requests
+      GetDataCallbackHandler[] cbList = new GetDataCallbackHandler[paths.size()];
+      for (int i = 0; i < paths.size(); i++)
+      {
+        if (!needRead[i])
+          continue;
+
+        String path = paths.get(i);
+        cbList[i] = new GetDataCallbackHandler();
+        _zkClient.asyncGetData(path, cbList[i]);
+      }
+
+      // wait for completion
+      for (int i = 0; i < cbList.length; i++)
+      {
+        if (!needRead[i])
+          continue;
+
+        GetDataCallbackHandler cb = cbList[i];
+        cb.waitForSuccess();
+      }
+
+      // construct return results
+      List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
+
+      for (int i = 0; i < paths.size(); i++)
+      {
+        if (!needRead[i])
+          continue;
+
+        GetDataCallbackHandler cb = cbList[i];
+        if (Code.get(cb.getRc()) == Code.OK)
+        {
+          @SuppressWarnings("unchecked")
+          T record = (T) _zkClient.deserialize(cb._data, paths.get(i));
+          records.set(i, record);
+          if (stats != null)
+          {
+            stats.set(i, cb._stat);
+          }
+        }
+      }
+
+      return records;
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("getData_async, size: " + paths.size() + ", paths: " + paths.get(0)
+          + ",... time: " + (endT - startT) + " ns");
+    }
+  }
+
+  /**
+   * asyn getChildren
+   * 
+   */
+  @Override
+  public List<T> getChildren(String parentPath, List<Stat> stats, int options)
+  {
+    try
+    {
+      // prepare child paths
+      List<String> childNames = getChildNames(parentPath, options);
+      if (childNames == null || childNames.size() == 0)
+      {
+        return Collections.emptyList();
+      }
+
+      List<String> paths = new ArrayList<String>();
+      for (String childName : childNames)
+      {
+        String path = parentPath + "/" + childName;
+        paths.add(path);
+      }
+
+      // remove null record
+      List<Stat> curStats = new ArrayList<Stat>(paths.size());
+      List<T> records = get(paths, curStats, options);
+      Iterator<T> recordIter = records.iterator();
+      Iterator<Stat> statIter = curStats.iterator();
+      while (statIter.hasNext())
+      {
+        recordIter.next();
+        if (statIter.next() == null)
+        {
+          statIter.remove();
+          recordIter.remove();
+        }
+      }
+
+      if (stats != null)
+      {
+        stats.clear();
+        stats.addAll(curStats);
+      }
+
+      return records;
+    }
+    catch (ZkNoNodeException e)
+    {
+      return Collections.emptyList();
+    }
+  }
+
+  /**
+   * sync getChildNames
+   * 
+   * @return null if parentPath doesn't exist
+   */
+  @Override
+  public List<String> getChildNames(String parentPath, int options)
+  {
+    try
+    {
+      List<String> childNames = _zkClient.getChildren(parentPath);
+      Collections.sort(childNames);
+      return childNames;
+    }
+    catch (ZkNoNodeException e)
+    {
+      return null;
+    }
+  }
+
+  /**
+   * sync exists
+   * 
+   */
+  @Override
+  public boolean exists(String path, int options)
+  {
+    return _zkClient.exists(path);
+  }
+
+  /**
+   * sync getStat
+   * 
+   */
+  @Override
+  public Stat getStat(String path, int options)
+  {
+    return _zkClient.getStat(path);
+  }
+
+  /**
+   * sync remove
+   * 
+   */
+  @Override
+  public boolean remove(String path, int options)
+  {
+    try
+    {
+      // optimize on common path
+      _zkClient.delete(path);
+    }
+    catch (ZkException e)
+    {
+      _zkClient.deleteRecursive(path);
+    }
+    return true;
+  }
+
+  /**
+   * async create. give up on error other than NONODE
+   * 
+   */
+  CreateCallbackHandler[] create(List<String> paths,
+                                 List<T> records,
+                                 boolean[] needCreate,
+                                 List<List<String>> pathsCreated,
+                                 int options)
+  {
+    if ((records != null && records.size() != paths.size())
+        || needCreate.length != paths.size()
+        || (pathsCreated != null && pathsCreated.size() != paths.size()))
+    {
+      throw new IllegalArgumentException("paths, records, needCreate, and pathsCreated should be of same size");
+    }
+
+    CreateCallbackHandler[] cbList = new CreateCallbackHandler[paths.size()];
+
+    CreateMode mode = AccessOption.getMode(options);
+    if (mode == null)
+    {
+      LOG.error("Invalid async set mode. options: " + options);
+      return cbList;
+    }
+
+    boolean retry;
+    do
+    {
+      retry = false;
+
+      for (int i = 0; i < paths.size(); i++)
+      {
+        if (!needCreate[i])
+          continue;
+
+        String path = paths.get(i);
+        T record = records == null ? null : records.get(i);
+        cbList[i] = new CreateCallbackHandler();
+        _zkClient.asyncCreate(path, record, mode, cbList[i]);
+      }
+
+      List<String> parentPaths =
+          new ArrayList<String>(Collections.<String> nCopies(paths.size(), null));
+      boolean failOnNoNode = false;
+
+      for (int i = 0; i < paths.size(); i++)
+      {
+        if (!needCreate[i])
+          continue;
+
+        CreateCallbackHandler cb = cbList[i];
+        cb.waitForSuccess();
+        String path = paths.get(i);
+
+        if (Code.get(cb.getRc()) == Code.NONODE)
+        {
+          String parentPath = new File(path).getParent();
+          parentPaths.set(i, parentPath);
+          failOnNoNode = true;
+        }
+        else
+        {
+          // if create succeed or fail on error other than NONODE,
+          // give up
+          needCreate[i] = false;
+
+          // if succeeds, record what paths we've created
+          if (Code.get(cb.getRc()) == Code.OK && pathsCreated != null)
+          {
+            if (pathsCreated.get(i) == null)
+            {
+              pathsCreated.set(i, new ArrayList<String>());
+            }
+            pathsCreated.get(i).add(path);
+          }
+        }
+      }
+
+      if (failOnNoNode)
+      {
+        boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
+
+        CreateCallbackHandler[] parentCbList =
+            create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
+        for (int i = 0; i < parentCbList.length; i++)
+        {
+          CreateCallbackHandler parentCb = parentCbList[i];
+          if (parentCb == null)
+            continue;
+
+          Code rc = Code.get(parentCb.getRc());
+
+          // if parent is created, retry create child
+          if (rc == Code.OK || rc == Code.NODEEXISTS)
+          {
+            retry = true;
+            break;
+          }
+        }
+      }
+    }
+    while (retry);
+
+    return cbList;
+  }
+
+
+  /**
+   * async create
+   * 
+   * TODO: rename to create
+   */
+  @Override
+  public boolean[] createChildren(List<String> paths, List<T> records, int options)
+  {
+    boolean[] success = new boolean[paths.size()];
+
+    CreateMode mode = AccessOption.getMode(options);
+    if (mode == null)
+    {
+      LOG.error("Invalid async create mode. options: " + options);
+      return success;
+    }
+
+    boolean[] needCreate = new boolean[paths.size()];
+    Arrays.fill(needCreate, true);
+    List<List<String>> pathsCreated =
+        new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
+
+    long startT = System.nanoTime();
+    try
+    {
+
+      CreateCallbackHandler[] cbList =
+          create(paths, records, needCreate, pathsCreated, options);
+
+      for (int i = 0; i < cbList.length; i++)
+      {
+        CreateCallbackHandler cb = cbList[i];
+        success[i] = (Code.get(cb.getRc()) == Code.OK);
+      }
+
+      return success;
+
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("create_async, size: " + paths.size() + ", paths: " + paths.get(0)
+          + ",... time: " + (endT - startT) + " ns");
+    }
+  }
+
+  /**
+   * async set
+   * 
+   * TODO: rename to set
+   * 
+   */
+  @Override
+  public boolean[] setChildren(List<String> paths, List<T> records, int options)
+  {
+    return set(paths, records, null, null, options);
+  }
+
+  /**
+   * async set, give up on error other than NoNode
+   * 
+   */
+  boolean[] set(List<String> paths,
+                List<T> records,
+                List<List<String>> pathsCreated,
+                List<Stat> stats,
+                int options)
+  {
+    if (paths == null || paths.size() == 0)
+    {
+      return new boolean[0];
+    }
+
+    if ((records != null && records.size() != paths.size())
+        || (pathsCreated != null && pathsCreated.size() != paths.size()))
+    {
+      throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size");
+    }
+
+    boolean[] success = new boolean[paths.size()];
+
+    CreateMode mode = AccessOption.getMode(options);
+    if (mode == null)
+    {
+      LOG.error("Invalid async set mode. options: " + options);
+      return success;
+    }
+
+    List<Stat> setStats =
+        new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
+    SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
+    CreateCallbackHandler[] createCbList = null;
+    boolean[] needSet = new boolean[paths.size()];
+    Arrays.fill(needSet, true);
+
+    long startT = System.nanoTime();
+
+    try
+    {
+      boolean retry;
+      do
+      {
+        retry = false;
+
+        for (int i = 0; i < paths.size(); i++)
+        {
+          if (!needSet[i])
+            continue;
+
+          String path = paths.get(i);
+          T record = records.get(i);
+          cbList[i] = new SetDataCallbackHandler();
+          _zkClient.asyncSetData(path, record, -1, cbList[i]);
+
+        }
+
+        boolean failOnNoNode = false;
+
+        for (int i = 0; i < cbList.length; i++)
+        {
+          SetDataCallbackHandler cb = cbList[i];
+          cb.waitForSuccess();
+          Code rc = Code.get(cb.getRc());
+          switch (rc)
+          {
+          case OK:
+            setStats.set(i, cb.getStat());
+            needSet[i] = false;
+            break;
+          case NONODE:
+            // if fail on NoNode, try create the node
+            failOnNoNode = true;
+            break;
+          default:
+            // if fail on error other than NoNode, give up
+            needSet[i] = false;
+            break;
+          }
+        }
+
+        // if failOnNoNode, try create
+        if (failOnNoNode)
+        {
+          boolean[] needCreate = Arrays.copyOf(needSet, needSet.length);
+          createCbList = create(paths, records, needCreate, pathsCreated, options);
+          for (int i = 0; i < createCbList.length; i++)
+          {
+            CreateCallbackHandler createCb = createCbList[i];
+            if (createCb == null)
+            {
+              continue;
+            }
+
+            Code rc = Code.get(createCb.getRc());
+            switch (rc)
+            {
+            case OK:
+              setStats.set(i, ZNode.ZERO_STAT);
+              needSet[i] = false;
+              break;
+            case NODEEXISTS:
+              retry = true;
+              break;
+            default:
+              // if creation fails on error other than NodeExists
+              // no need to retry set
+              needSet[i] = false;
+              break;
+            }
+          }
+        }
+      }
+      while (retry);
+
+      // construct return results
+      for (int i = 0; i < cbList.length; i++)
+      {
+        SetDataCallbackHandler cb = cbList[i];
+
+        Code rc = Code.get(cb.getRc());
+        if (rc == Code.OK)
+        {
+          success[i] = true;
+        }
+        else if (rc == Code.NONODE)
+        {
+          CreateCallbackHandler createCb = createCbList[i];
+          if (Code.get(createCb.getRc()) == Code.OK)
+          {
+            success[i] = true;
+          }
+        }
+      }
+
+      if (stats != null)
+      {
+        stats.clear();
+        stats.addAll(setStats);
+      }
+
+      return success;
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
+          + ",... time: " + (endT - startT) + " ns");
+    }
+  }
+
+  // TODO: rename to update
+  /**
+   * async update
+   */
+  @Override
+  public boolean[] updateChildren(List<String> paths,
+                                  List<DataUpdater<T>> updaters,
+                                  int options)
+  {
+
+    List<T> updateData = update(paths, updaters, null, null, options);
+    boolean[] success = new boolean[paths.size()]; // init to false
+    for (int i = 0; i < paths.size(); i++)
+    {
+      T data = updateData.get(i);
+      success[i] = (data != null);
+    }
+    return success;
+  }
+
+  /**
+   * async update
+   * 
+   * return: updatedData on success or null on fail
+   */
+  List<T> update(List<String> paths,
+                 List<DataUpdater<T>> updaters,
+                 List<List<String>> pathsCreated,
+                 List<Stat> stats,
+                 int options)
+  {
+    if (paths == null || paths.size() == 0)
+    {
+      LOG.error("paths is null or empty");
+      return Collections.emptyList();
+    }
+
+    if (updaters.size() != paths.size()
+        || (pathsCreated != null && pathsCreated.size() != paths.size()))
+    {
+      throw new IllegalArgumentException("paths, updaters, and pathsCreated should be of same size");
+    }
+
+    List<Stat> setStats =
+        new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
+    List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
+
+    CreateMode mode = AccessOption.getMode(options);
+    if (mode == null)
+    {
+      LOG.error("Invalid update mode. options: " + options);
+      return updateData;
+    }
+
+    SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
+    CreateCallbackHandler[] createCbList = null;
+    boolean[] needUpdate = new boolean[paths.size()];
+    Arrays.fill(needUpdate, true);
+
+    long startT = System.nanoTime();
+
+    try
+    {
+      boolean retry;
+      do
+      {
+        retry = false;
+        boolean[] needCreate = new boolean[paths.size()]; // init'ed with false
+        boolean failOnNoNode = false;
+
+        // asycn read all data
+        List<Stat> curStats = new ArrayList<Stat>();
+        List<T> curDataList =
+            get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length));
+
+        // async update
+        List<T> newDataList = new ArrayList<T>();
+        for (int i = 0; i < paths.size(); i++)
+        {
+          if (!needUpdate[i])
+          {
+            newDataList.add(null);
+            continue;
+          }
+          String path = paths.get(i);
+          DataUpdater<T> updater = updaters.get(i);
+          T newData = updater.update(curDataList.get(i));
+          newDataList.add(newData);
+          Stat curStat = curStats.get(i);
+          if (curStat == null)
+          {
+            // node not exists
+            failOnNoNode = true;
+            needCreate[i] = true;
+          }
+          else
+          {
+            cbList[i] = new SetDataCallbackHandler();
+            _zkClient.asyncSetData(path, newData, curStat.getVersion(), cbList[i]);
+          }
+        }
+
+        // wait for completion
+        boolean failOnBadVersion = false;
+
+        for (int i = 0; i < paths.size(); i++)
+        {
+          SetDataCallbackHandler cb = cbList[i];
+          if (cb == null)
+            continue;
+
+          cb.waitForSuccess();
+
+          switch (Code.get(cb.getRc()))
+          {
+          case OK:
+            updateData.set(i, newDataList.get(i));
+            setStats.set(i, cb.getStat());
+            needUpdate[i] = false;
+            break;
+          case NONODE:
+            failOnNoNode = true;
+            needCreate[i] = true;
+            break;
+          case BADVERSION:
+            failOnBadVersion = true;
+            break;
+          default:
+            // if fail on error other than NoNode or BadVersion
+            // will not retry
+            needUpdate[i] = false;
+            break;
+          }
+        }
+
+        // if failOnNoNode, try create
+        if (failOnNoNode)
+        {
+          createCbList = create(paths, newDataList, needCreate, pathsCreated, options);
+          for (int i = 0; i < paths.size(); i++)
+          {
+            CreateCallbackHandler createCb = createCbList[i];
+            if (createCb == null)
+            {
+              continue;
+            }
+
+            switch (Code.get(createCb.getRc()))
+            {
+            case OK:
+              needUpdate[i] = false;
+              updateData.set(i, newDataList.get(i));
+              setStats.set(i, ZNode.ZERO_STAT);
+              break;
+            case NODEEXISTS:
+              retry = true;
+              break;
+            default:
+              // if fail on error other than NodeExists
+              // will not retry
+              needUpdate[i] = false;
+              break;
+            }
+          }
+        }
+
+        // if failOnBadVersion, retry
+        if (failOnBadVersion)
+        {
+          retry = true;
+        }
+      }
+      while (retry);
+
+      if (stats != null)
+      {
+        stats.clear();
+        stats.addAll(setStats);
+      }
+
+      return updateData;
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
+          + ",... time: " + (endT - startT) + " ns");
+    }
+
+  }
+
+  /**
+   * async exists
+   * 
+   */
+  @Override
+  public boolean[] exists(List<String> paths, int options)
+  {
+    Stat[] stats = getStats(paths, options);
+
+    boolean[] exists = new boolean[paths.size()];
+    for (int i = 0; i < paths.size(); i++)
+    {
+      exists[i] = (stats[i] != null);
+    }
+
+    return exists;
+  }
+
+  /**
+   * async getStat
+   * 
+   */
+  @Override
+  public Stat[] getStats(List<String> paths, int options)
+  {
+    if (paths == null || paths.size() == 0)
+    {
+      LOG.error("paths is null or empty");
+      return new Stat[0];
+    }
+
+    Stat[] stats = new Stat[paths.size()];
+
+    long startT = System.nanoTime();
+
+    try
+    {
+      ExistsCallbackHandler[] cbList = new ExistsCallbackHandler[paths.size()];
+      for (int i = 0; i < paths.size(); i++)
+      {
+        String path = paths.get(i);
+        cbList[i] = new ExistsCallbackHandler();
+        _zkClient.asyncExists(path, cbList[i]);
+      }
+
+      for (int i = 0; i < cbList.length; i++)
+      {
+        ExistsCallbackHandler cb = cbList[i];
+        cb.waitForSuccess();
+        stats[i] = cb._stat;
+      }
+
+      return stats;
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("exists_async, size: " + paths.size() + ", paths: " + paths.get(0)
+          + ",... time: " + (endT - startT) + " ns");
+    }
+  }
+
+  /**
+   * async remove
+   * 
+   */
+  @Override
+  public boolean[] remove(List<String> paths, int options)
+  {
+    if (paths == null || paths.size() == 0)
+    {
+      return new boolean[0];
+    }
+
+    boolean[] success = new boolean[paths.size()];
+
+    DeleteCallbackHandler[] cbList = new DeleteCallbackHandler[paths.size()];
+
+    long startT = System.nanoTime();
+
+    try
+    {
+      for (int i = 0; i < paths.size(); i++)
+      {
+        String path = paths.get(i);
+        cbList[i] = new DeleteCallbackHandler();
+        _zkClient.asyncDelete(path, cbList[i]);
+      }
+
+      for (int i = 0; i < cbList.length; i++)
+      {
+        DeleteCallbackHandler cb = cbList[i];
+        cb.waitForSuccess();
+        success[i] = (cb.getRc() == 0);
+      }
+
+      return success;
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("delete_async, size: " + paths.size() + ", paths: " + paths.get(0)
+          + ",... time: " + (endT - startT) + " ns");
+    }
+  }
+
+  /**
+   * Subscribe to zookeeper data changes
+   */
+  @Override
+  public void subscribeDataChanges(String path, IZkDataListener listener)
+  {
+    _zkClient.subscribeDataChanges(path, listener);
+  }
+
+  /**
+   * Unsubscribe to zookeeper data changes
+   */
+  @Override
+  public void unsubscribeDataChanges(String path, IZkDataListener dataListener)
+  {
+    _zkClient.unsubscribeDataChanges(path, dataListener);
+  }
+
+  /**
+   * Subscrie to zookeeper data changes
+   */
+  @Override
+  public List<String> subscribeChildChanges(String path, IZkChildListener listener)
+  {
+    return _zkClient.subscribeChildChanges(path, listener);
+  }
+
+  /**
+   * Unsubscrie to zookeeper data changes
+   */
+  @Override
+  public void unsubscribeChildChanges(String path, IZkChildListener childListener)
+  {
+    _zkClient.unsubscribeChildChanges(path, childListener);
+  }
+
+  // simple test
+  public static void main(String[] args)
+  {
+    ZkClient zkclient = new ZkClient("localhost:2191");
+    zkclient.setZkSerializer(new ZNRecordSerializer());
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
+
+    // test async create
+    List<String> createPaths =
+        Arrays.asList("/test/child1/child1", "/test/child2/child2");
+    List<ZNRecord> createRecords =
+        Arrays.asList(new ZNRecord("child1"), new ZNRecord("child2"));
+
+    boolean[] needCreate = new boolean[createPaths.size()];
+    Arrays.fill(needCreate, true);
+    List<List<String>> pathsCreated =
+        new ArrayList<List<String>>(Collections.<List<String>> nCopies(createPaths.size(),
+                                                                       null));
+    accessor.create(createPaths,
+                    createRecords,
+                    needCreate,
+                    pathsCreated,
+                    AccessOption.PERSISTENT);
+    System.out.println("pathsCreated: " + pathsCreated);
+
+    // test async set
+    List<String> setPaths =
+        Arrays.asList("/test/setChild1/setChild1", "/test/setChild2/setChild2");
+    List<ZNRecord> setRecords =
+        Arrays.asList(new ZNRecord("setChild1"), new ZNRecord("setChild2"));
+
+    pathsCreated =
+        new ArrayList<List<String>>(Collections.<List<String>> nCopies(setPaths.size(),
+                                                                       null));
+    boolean[] success =
+        accessor.set(setPaths, setRecords, pathsCreated, null, AccessOption.PERSISTENT);
+    System.out.println("pathsCreated: " + pathsCreated);
+    System.out.println("setSuccess: " + Arrays.toString(success));
+
+    // test async update
+    List<String> updatePaths =
+        Arrays.asList("/test/updateChild1/updateChild1", "/test/setChild2/setChild2");
+    class TestUpdater implements DataUpdater<ZNRecord>
+    {
+      final ZNRecord _newData;
+
+      public TestUpdater(ZNRecord newData)
+      {
+        _newData = newData;
+      }
+
+      @Override
+      public ZNRecord update(ZNRecord currentData)
+      {
+        return _newData;
+
+      }
+    }
+    List<DataUpdater<ZNRecord>> updaters =
+        Arrays.asList((DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild1")),
+                      (DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild2")));
+
+    pathsCreated =
+        new ArrayList<List<String>>(Collections.<List<String>> nCopies(updatePaths.size(),
+                                                                       null));
+
+    List<ZNRecord> updateRecords =
+        accessor.update(updatePaths, updaters, pathsCreated, null, AccessOption.PERSISTENT);
+    for (int i = 0; i < updatePaths.size(); i++)
+    {
+      success[i] = updateRecords.get(i) != null;
+    }
+    System.out.println("pathsCreated: " + pathsCreated);
+    System.out.println("updateSuccess: " + Arrays.toString(success));
+
+    System.out.println("CLOSING");
+    zkclient.close();
+  }
+
+  /**
+   * Reset
+   */
+  @Override
+  public void reset()
+  {
+    // Nothing to do
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
new file mode 100644
index 0000000..eafe7c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -0,0 +1,984 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.AccessOption;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
+import org.apache.helix.store.HelixPropertyListener;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+
+
+public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
+{
+  private static final Logger    LOG          =
+                                                  Logger.getLogger(ZkCacheBaseDataAccessor.class);
+
+  protected WriteThroughCache<T> _wtCache;
+  protected ZkCallbackCache<T>   _zkCache;
+
+  final ZkBaseDataAccessor<T>    _baseAccessor;
+  final Map<String, Cache<T>>    _cacheMap;
+
+  final String                   _chrootPath;
+  final List<String>             _wtCachePaths;
+  final List<String>             _zkCachePaths;
+
+  final HelixGroupCommit<T>      _groupCommit = new HelixGroupCommit<T>();
+
+  // fire listeners
+  private final ReentrantLock    _eventLock   = new ReentrantLock();
+  private ZkCacheEventThread     _eventThread;
+
+  private ZkClient               _zkclient    = null;
+
+  public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
+                                 List<String> wtCachePaths)
+  {
+    this(baseAccessor, null, wtCachePaths, null);
+  }
+
+  public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
+                                 String chrootPath,
+                                 List<String> wtCachePaths,
+                                 List<String> zkCachePaths)
+  {
+    _baseAccessor = baseAccessor;
+
+    if (chrootPath == null || chrootPath.equals("/"))
+    {
+      _chrootPath = null;
+    }
+    else
+    {
+      PathUtils.validatePath(chrootPath);
+      _chrootPath = chrootPath;
+    }
+
+    _wtCachePaths = wtCachePaths;
+    _zkCachePaths = zkCachePaths;
+
+    // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
+    // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
+    // comes first
+    _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
+    {
+      @Override
+      public int compare(String o1, String o2)
+      {
+        int len1 = o1.split("/").length;
+        int len2 = o2.split("/").length;
+        return len1 - len2;
+      }
+    });
+
+    start();
+  }
+
+  public ZkCacheBaseDataAccessor(String zkAddress,
+                                 ZkSerializer serializer,
+                                 String chrootPath,
+                                 List<String> wtCachePaths,
+                                 List<String> zkCachePaths)
+  {
+    _zkclient =
+        new ZkClient(zkAddress,
+                     ZkClient.DEFAULT_SESSION_TIMEOUT,
+                     ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+                     serializer);
+    _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+                                 TimeUnit.MILLISECONDS);
+    _baseAccessor = new ZkBaseDataAccessor<T>(_zkclient);
+
+    if (chrootPath == null || chrootPath.equals("/"))
+    {
+      _chrootPath = null;
+    }
+    else
+    {
+      PathUtils.validatePath(chrootPath);
+      _chrootPath = chrootPath;
+    }
+
+    _wtCachePaths = wtCachePaths;
+    _zkCachePaths = zkCachePaths;
+
+    // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
+    // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
+    // comes first
+    _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
+    {
+      @Override
+      public int compare(String o1, String o2)
+      {
+        int len1 = o1.split("/").length;
+        int len2 = o2.split("/").length;
+        return len1 - len2;
+      }
+    });
+
+    start();
+  }
+
+  private String prependChroot(String clientPath)
+  {
+    if (_chrootPath != null)
+    {
+      // handle clientPath = "/"
+      if (clientPath.length() == 1)
+      {
+        return _chrootPath;
+      }
+      return _chrootPath + clientPath;
+    }
+    else
+    {
+      return clientPath;
+    }
+  }
+
+  private List<String> prependChroot(List<String> clientPaths)
+  {
+    List<String> serverPaths = new ArrayList<String>();
+    for (String clientPath : clientPaths)
+    {
+      serverPaths.add(prependChroot(clientPath));
+    }
+    return serverPaths;
+  }
+
+  /**
+   * find the first path in paths that is a descendant
+   */
+  private String firstCachePath(List<String> paths)
+  {
+    for (String cachePath : _cacheMap.keySet())
+    {
+      for (String path : paths)
+      {
+        if (path.startsWith(cachePath))
+        {
+          return path;
+        }
+      }
+    }
+    return null;
+  }
+
+  private Cache<T> getCache(String path)
+  {
+    for (String cachePath : _cacheMap.keySet())
+    {
+      if (path.startsWith(cachePath))
+      {
+        return _cacheMap.get(cachePath);
+      }
+    }
+
+    return null;
+  }
+
+  private Cache<T> getCache(List<String> paths)
+  {
+    Cache<T> cache = null;
+    for (String path : paths)
+    {
+      for (String cachePath : _cacheMap.keySet())
+      {
+        if (cache == null && path.startsWith(cachePath))
+        {
+          cache = _cacheMap.get(cachePath);
+        }
+        else if (cache != null && cache != _cacheMap.get(cachePath))
+        {
+          throw new IllegalArgumentException("Couldn't do cross-cache async operations. paths: "
+              + paths);
+        }
+      }
+    }
+
+    return cache;
+  }
+
+  private void updateCache(Cache<T> cache,
+                           List<String> createPaths,
+                           boolean success,
+                           String updatePath,
+                           T data,
+                           Stat stat)
+  {
+    if (createPaths == null || createPaths.isEmpty())
+    {
+      if (success)
+      {
+        cache.update(updatePath, data, stat);
+      }
+    }
+    else
+    {
+      String firstPath = firstCachePath(createPaths);
+      if (firstPath != null)
+      {
+        cache.updateRecursive(firstPath);
+      }
+    }
+  }
+
+  @Override
+  public boolean create(String path, T data, int options)
+  {
+    String clientPath = path;
+    String serverPath = prependChroot(clientPath);
+
+    Cache<T> cache = getCache(serverPath);
+    if (cache != null)
+    {
+      try
+      {
+        cache.lockWrite();
+        List<String> pathsCreated = new ArrayList<String>();
+        RetCode rc = _baseAccessor.create(serverPath, data, pathsCreated, options);
+        boolean success = (rc == RetCode.OK);
+
+        updateCache(cache, pathsCreated, success, serverPath, data, ZNode.ZERO_STAT);
+
+        return success;
+      }
+      finally
+      {
+        cache.unlockWrite();
+      }
+    }
+
+    // no cache
+    return _baseAccessor.create(serverPath, data, options);
+  }
+
+  @Override
+  public boolean set(String path, T data, int options)
+  {
+    String clientPath = path;
+    String serverPath = prependChroot(clientPath);
+
+    Cache<T> cache = getCache(serverPath);
+    if (cache != null)
+    {
+      try
+      {
+        cache.lockWrite();
+        Stat setStat = new Stat();
+        List<String> pathsCreated = new ArrayList<String>();
+        boolean success =
+            _baseAccessor.set(serverPath, data, pathsCreated, setStat, -1, options);
+
+        updateCache(cache, pathsCreated, success, serverPath, data, setStat);
+
+        return success;
+      }
+      finally
+      {
+        cache.unlockWrite();
+      }
+    }
+
+    // no cache
+    return _baseAccessor.set(serverPath, data, options);
+  }
+
+  @Override
+  public boolean update(String path, DataUpdater<T> updater, int options)
+  {
+    String clientPath = path;
+    String serverPath = prependChroot(clientPath);
+
+    Cache<T> cache = getCache(serverPath);
+
+    if (cache != null)
+    {
+      try
+      {
+        cache.lockWrite();
+        Stat setStat = new Stat();
+        List<String> pathsCreated = new ArrayList<String>();
+        T updateData =
+            _baseAccessor.update(serverPath, updater, pathsCreated, setStat, options);
+        boolean success = (updateData != null);
+        updateCache(cache, pathsCreated, success, serverPath, updateData, setStat);
+
+        return success;
+      }
+      finally
+      {
+        cache.unlockWrite();
+      }
+    }
+
+    // no cache
+    return _groupCommit.commit(_baseAccessor, options, serverPath, updater);
+    // return _baseAccessor.update(serverPath, updater, options);
+  }
+
+  @Override
+  public boolean exists(String path, int options)
+  {
+    String clientPath = path;
+    String serverPath = prependChroot(clientPath);
+
+    Cache<T> cache = getCache(serverPath);
+    if (cache != null)
+    {
+      boolean exists = cache.exists(serverPath);
+      if (exists)
+      {
+        return true;
+      }
+    }
+
+    // if not exists in cache, always fall back to zk
+    return _baseAccessor.exists(serverPath, options);
+  }
+
+  @Override
+  public boolean remove(String path, int options)
+  {
+    String clientPath = path;
+    String serverPath = prependChroot(clientPath);
+
+    Cache<T> cache = getCache(serverPath);
+    if (cache != null)
+    {
+      try
+      {
+        cache.lockWrite();
+
+        boolean success = _baseAccessor.remove(serverPath, options);
+        if (success)
+        {
+          cache.purgeRecursive(serverPath);
+        }
+
+        return success;
+      }
+      finally
+      {
+        cache.unlockWrite();
+      }
+    }
+
+    // no cache
+    return _baseAccessor.remove(serverPath, options);
+  }
+
+  @Override
+  public T get(String path, Stat stat, int options)
+  {
+    String clientPath = path;
+    String serverPath = prependChroot(clientPath);
+
+    Cache<T> cache = getCache(serverPath);
+    if (cache != null)
+    {
+      T record = null;
+      ZNode znode = cache.get(serverPath);
+
+      if (znode != null)
+      {
+        // TODO: shall return a deep copy instead of reference
+        record = ((T) znode.getData());
+        if (stat != null)
+        {
+          DataTree.copyStat(znode.getStat(), stat);
+        }
+        return record;
+
+      }
+      else
+      {
+        // if cache miss, fall back to zk and update cache
+        try
+        {
+          cache.lockWrite();
+          record = _baseAccessor.get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+          cache.update(serverPath, record, stat);
+        }
+        catch (ZkNoNodeException e)
+        {
+          if (AccessOption.isThrowExceptionIfNotExist(options))
+          {
+            throw e;
+          }
+        }
+        finally
+        {
+          cache.unlockWrite();
+        }
+
+        return record;
+      }
+    }
+
+    // no cache
+    return _baseAccessor.get(serverPath, stat, options);
+  }
+
+  @Override
+  public Stat getStat(String path, int options)
+  {
+    String clientPath = path;
+    String serverPath = prependChroot(clientPath);
+
+    Cache<T> cache = getCache(serverPath);
+    if (cache != null)
+    {
+      Stat stat = new Stat();
+      ZNode znode = cache.get(serverPath);
+
+      if (znode != null)
+      {
+        return znode.getStat();
+
+      }
+      else
+      {
+        // if cache miss, fall back to zk and update cache
+        try
+        {
+          cache.lockWrite();
+          T data = _baseAccessor.get(serverPath, stat, options);
+          cache.update(serverPath, data, stat);
+        }
+        catch (ZkNoNodeException e)
+        {
+          return null;
+        }
+        finally
+        {
+          cache.unlockWrite();
+        }
+
+        return stat;
+      }
+    }
+
+    // no cache
+    return _baseAccessor.getStat(serverPath, options);
+  }
+
+  @Override
+  public boolean[] createChildren(List<String> paths, List<T> records, int options)
+  {
+    final int size = paths.size();
+    List<String> serverPaths = prependChroot(paths);
+
+    Cache<T> cache = getCache(serverPaths);
+    if (cache != null)
+    {
+      try
+      {
+        cache.lockWrite();
+        boolean[] needCreate = new boolean[size];
+        Arrays.fill(needCreate, true);
+        List<List<String>> pathsCreatedList =
+            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+        CreateCallbackHandler[] createCbList =
+            _baseAccessor.create(serverPaths,
+                                 records,
+                                 needCreate,
+                                 pathsCreatedList,
+                                 options);
+
+        boolean[] success = new boolean[size];
+        for (int i = 0; i < size; i++)
+        {
+          CreateCallbackHandler cb = createCbList[i];
+          success[i] = (Code.get(cb.getRc()) == Code.OK);
+
+          updateCache(cache,
+                      pathsCreatedList.get(i),
+                      success[i],
+                      serverPaths.get(i),
+                      records.get(i),
+                      ZNode.ZERO_STAT);
+        }
+
+        return success;
+      }
+      finally
+      {
+        cache.unlockWrite();
+      }
+    }
+
+    // no cache
+    return _baseAccessor.createChildren(serverPaths, records, options);
+  }
+
+  @Override
+  public boolean[] setChildren(List<String> paths, List<T> records, int options)
+  {
+    final int size = paths.size();
+    List<String> serverPaths = prependChroot(paths);
+
+    Cache<T> cache = getCache(serverPaths);
+    if (cache != null)
+    {
+      try
+      {
+        cache.lockWrite();
+        List<Stat> setStats = new ArrayList<Stat>();
+        List<List<String>> pathsCreatedList =
+            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+        boolean[] success =
+            _baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
+
+        for (int i = 0; i < size; i++)
+        {
+          updateCache(cache,
+                      pathsCreatedList.get(i),
+                      success[i],
+                      serverPaths.get(i),
+                      records.get(i),
+                      setStats.get(i));
+        }
+
+        return success;
+      }
+      finally
+      {
+        cache.unlockWrite();
+      }
+    }
+
+    return _baseAccessor.setChildren(serverPaths, records, options);
+  }
+
+  @Override
+  public boolean[] updateChildren(List<String> paths,
+                                  List<DataUpdater<T>> updaters,
+                                  int options)
+  {
+    final int size = paths.size();
+    List<String> serverPaths = prependChroot(paths);
+
+    Cache<T> cache = getCache(serverPaths);
+    if (cache != null)
+    {
+      try
+      {
+        cache.lockWrite();
+
+        List<Stat> setStats = new ArrayList<Stat>();
+        boolean[] success = new boolean[size];
+        List<List<String>> pathsCreatedList =
+            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+        List<T> updateData =
+            _baseAccessor.update(serverPaths,
+                                 updaters,
+                                 pathsCreatedList,
+                                 setStats,
+                                 options);
+
+        // System.out.println("updateChild: ");
+        // for (T data : updateData)
+        // {
+        // System.out.println(data);
+        // }
+
+        for (int i = 0; i < size; i++)
+        {
+          success[i] = (updateData.get(i) != null);
+          updateCache(cache,
+                      pathsCreatedList.get(i),
+                      success[i],
+                      serverPaths.get(i),
+                      updateData.get(i),
+                      setStats.get(i));
+        }
+        return success;
+      }
+      finally
+      {
+        cache.unlockWrite();
+      }
+    }
+
+    // no cache
+    return _baseAccessor.updateChildren(serverPaths, updaters, options);
+  }
+
+  // TODO: change to use async_exists
+  @Override
+  public boolean[] exists(List<String> paths, int options)
+  {
+    final int size = paths.size();
+    List<String> serverPaths = prependChroot(paths);
+
+    boolean exists[] = new boolean[size];
+    for (int i = 0; i < size; i++)
+    {
+      exists[i] = exists(serverPaths.get(i), options);
+    }
+    return exists;
+  }
+
+  @Override
+  public boolean[] remove(List<String> paths, int options)
+  {
+    final int size = paths.size();
+    List<String> serverPaths = prependChroot(paths);
+
+    Cache<T> cache = getCache(serverPaths);
+    if (cache != null)
+    {
+      try
+      {
+        cache.lockWrite();
+
+        boolean[] success = _baseAccessor.remove(serverPaths, options);
+
+        for (int i = 0; i < size; i++)
+        {
+          if (success[i])
+          {
+            cache.purgeRecursive(serverPaths.get(i));
+          }
+        }
+        return success;
+      }
+      finally
+      {
+        cache.unlockWrite();
+      }
+    }
+
+    // no cache
+    return _baseAccessor.remove(serverPaths, options);
+  }
+
+  @Override
+  public List<T> get(List<String> paths, List<Stat> stats, int options)
+  {
+    if (paths == null || paths.isEmpty())
+    {
+      return Collections.emptyList();
+    }
+
+    final int size = paths.size();
+    List<String> serverPaths = prependChroot(paths);
+
+    List<T> records = new ArrayList<T>(Collections.<T> nCopies(size, null));
+    List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat> nCopies(size, null));
+
+    boolean needRead = false;
+    boolean needReads[] = new boolean[size]; // init to false
+
+    Cache<T> cache = getCache(serverPaths);
+    if (cache != null)
+    {
+      try
+      {
+        cache.lockRead();
+        for (int i = 0; i < size; i++)
+        {
+          ZNode zNode = cache.get(serverPaths.get(i));
+          if (zNode != null)
+          {
+            // TODO: shall return a deep copy instead of reference
+            records.set(i, (T) zNode.getData());
+            readStats.set(i, zNode.getStat());
+          }
+          else
+          {
+            needRead = true;
+            needReads[i] = true;
+          }
+        }
+      }
+      finally
+      {
+        cache.unlockRead();
+      }
+
+      // cache miss, fall back to zk and update cache
+      if (needRead)
+      {
+        cache.lockWrite();
+        try
+        {
+          List<T> readRecords = _baseAccessor.get(serverPaths, readStats, needReads);
+          for (int i = 0; i < size; i++)
+          {
+            if (needReads[i])
+            {
+              records.set(i, readRecords.get(i));
+              cache.update(serverPaths.get(i), readRecords.get(i), readStats.get(i));
+            }
+          }
+        }
+        finally
+        {
+          cache.unlockWrite();
+        }
+      }
+
+      if (stats != null)
+      {
+        stats.clear();
+        stats.addAll(readStats);
+      }
+
+      return records;
+    }
+
+    // no cache
+    return _baseAccessor.get(serverPaths, stats, options);
+  }
+
+  // TODO: add cache
+  @Override
+  public Stat[] getStats(List<String> paths, int options)
+  {
+    List<String> serverPaths = prependChroot(paths);
+    return _baseAccessor.getStats(serverPaths, options);
+  }
+
+  @Override
+  public List<String> getChildNames(String parentPath, int options)
+  {
+    String serverParentPath = prependChroot(parentPath);
+
+    Cache<T> cache = getCache(serverParentPath);
+    if (cache != null)
+    {
+      // System.out.println("zk-cache");
+      ZNode znode = cache.get(serverParentPath);
+
+      if (znode != null && znode.getChildSet() != Collections.<String> emptySet())
+      {
+        // System.out.println("zk-cache-hit: " + parentPath);
+        List<String> childNames = new ArrayList<String>(znode.getChildSet());
+        Collections.sort(childNames);
+        return childNames;
+      }
+      else
+      {
+        // System.out.println("zk-cache-miss");
+        try
+        {
+          cache.lockWrite();
+
+          List<String> childNames =
+              _baseAccessor.getChildNames(serverParentPath, options);
+          // System.out.println("\t--" + childNames);
+          cache.addToParentChildSet(serverParentPath, childNames);
+
+          return childNames;
+        }
+        finally
+        {
+          cache.unlockWrite();
+        }
+      }
+    }
+
+    // no cache
+    return _baseAccessor.getChildNames(serverParentPath, options);
+  }
+
+  @Override
+  public List<T> getChildren(String parentPath, List<Stat> stats, int options)
+  {
+    List<String> childNames = getChildNames(parentPath, options);
+    if (childNames == null)
+    {
+      return null;
+    }
+
+    List<String> paths = new ArrayList<String>();
+    for (String childName : childNames)
+    {
+      String path = parentPath + "/" + childName;
+      paths.add(path);
+    }
+
+    return get(paths, stats, options);
+  }
+
+  @Override
+  public void subscribeDataChanges(String path, IZkDataListener listener)
+  {
+    String serverPath = prependChroot(path);
+
+    _baseAccessor.subscribeDataChanges(serverPath, listener);
+  }
+
+  @Override
+  public void unsubscribeDataChanges(String path, IZkDataListener listener)
+  {
+    String serverPath = prependChroot(path);
+
+    _baseAccessor.unsubscribeDataChanges(serverPath, listener);
+  }
+
+  @Override
+  public List<String> subscribeChildChanges(String path, IZkChildListener listener)
+  {
+    String serverPath = prependChroot(path);
+
+    return _baseAccessor.subscribeChildChanges(serverPath, listener);
+  }
+
+  @Override
+  public void unsubscribeChildChanges(String path, IZkChildListener listener)
+  {
+    String serverPath = prependChroot(path);
+
+    _baseAccessor.unsubscribeChildChanges(serverPath, listener);
+  }
+
+  @Override
+  public void subscribe(String parentPath, HelixPropertyListener listener)
+  {
+    String serverPath = prependChroot(parentPath);
+    _zkCache.subscribe(serverPath, listener);
+  }
+
+  @Override
+  public void unsubscribe(String parentPath, HelixPropertyListener listener)
+  {
+    String serverPath = prependChroot(parentPath);
+    _zkCache.unsubscribe(serverPath, listener);
+  }
+
+  @Override
+  public void start()
+  {
+
+    LOG.info("START: Init ZkCacheBaseDataAccessor: " + _chrootPath + ", " + _wtCachePaths
+        + ", " + _zkCachePaths);
+
+    // start event thread
+    try
+    {
+      _eventLock.lockInterruptibly();
+      if (_eventThread != null)
+      {
+        LOG.warn(_eventThread + " has already started");
+      }
+      else
+      {
+
+        if (_zkCachePaths == null || _zkCachePaths.isEmpty())
+        {
+          LOG.warn("ZkCachePaths is null or empty. Will not start ZkCacheEventThread");
+        }
+        else
+        {
+          LOG.debug("Starting ZkCacheEventThread...");
+
+          _eventThread = new ZkCacheEventThread("");
+          _eventThread.start();
+        }
+      }
+    }
+    catch (InterruptedException e)
+    {
+      LOG.error("Current thread is interrupted when starting ZkCacheEventThread. ", e);
+    }
+    finally
+    {
+      _eventLock.unlock();
+    }
+    LOG.debug("Start ZkCacheEventThread...done");
+
+    _wtCache = new WriteThroughCache<T>(_baseAccessor, _wtCachePaths);
+    _zkCache =
+        new ZkCallbackCache<T>(_baseAccessor, _chrootPath, _zkCachePaths, _eventThread);
+
+    if (_wtCachePaths != null && !_wtCachePaths.isEmpty())
+    {
+      for (String path : _wtCachePaths)
+      {
+        _cacheMap.put(path, _wtCache);
+      }
+    }
+
+    if (_zkCachePaths != null && !_zkCachePaths.isEmpty())
+    {
+      for (String path : _zkCachePaths)
+      {
+        _cacheMap.put(path, _zkCache);
+      }
+    }
+  }
+
+  @Override
+  public void stop()
+  {
+    try
+    {
+      _eventLock.lockInterruptibly();
+
+      if (_zkclient != null)
+      {
+        _zkclient.close();
+        _zkclient = null;
+      }
+
+      if (_eventThread == null)
+      {
+        LOG.warn(_eventThread + " has already stopped");
+        return;
+      }
+
+      LOG.debug("Stopping ZkCacheEventThread...");
+      _eventThread.interrupt();
+      _eventThread.join(2000);
+      _eventThread = null;
+    }
+    catch (InterruptedException e)
+    {
+      LOG.error("Current thread is interrupted when stopping ZkCacheEventThread.");
+    }
+    finally
+    {
+      _eventLock.unlock();
+    }
+
+    LOG.debug("Stop ZkCacheEventThread...done");
+
+  }
+  
+  @Override
+  public void reset()
+  {
+    if (_wtCache != null)
+    {
+      _wtCache.reset();
+    }
+    
+    if (_zkCache != null)
+    {
+      _zkCache.reset();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
new file mode 100644
index 0000000..6424719
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
@@ -0,0 +1,88 @@
+package org.apache.helix.manager.zk;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.log4j.Logger;
+
+// copy from ZkEventThread
+public class ZkCacheEventThread extends Thread
+{
+
+  private static final Logger          LOG      =
+                                                    Logger.getLogger(ZkCacheEventThread.class);
+  private final BlockingQueue<ZkCacheEvent> _events  = new LinkedBlockingQueue<ZkCacheEvent>();
+  private static AtomicInteger         _eventId = new AtomicInteger(0);
+
+  static abstract class ZkCacheEvent
+  {
+
+    private final String _description;
+
+    public ZkCacheEvent(String description)
+    {
+      _description = description;
+    }
+
+    public abstract void run() throws Exception;
+
+    @Override
+    public String toString()
+    {
+      return "ZkCacheEvent[" + _description + "]";
+    }
+  }
+
+  ZkCacheEventThread(String name)
+  {
+    setDaemon(true);
+    setName("ZkCache-EventThread-" + getId() + "-" + name);
+  }
+
+  @Override
+  public void run()
+  {
+    LOG.info("Starting ZkCache event thread.");
+    try
+    {
+      while (!isInterrupted())
+      {
+        ZkCacheEvent zkEvent = _events.take();
+        int eventId = _eventId.incrementAndGet();
+        LOG.debug("Delivering event #" + eventId + " " + zkEvent);
+        try
+        {
+          zkEvent.run();
+        }
+        catch (InterruptedException e)
+        {
+          interrupt();
+        }
+        catch (ZkInterruptedException e)
+        {
+          interrupt();
+        }
+        catch (Throwable e)
+        {
+          LOG.error("Error handling event " + zkEvent, e);
+        }
+        LOG.debug("Delivering event #" + eventId + " done");
+      }
+    }
+    catch (InterruptedException e)
+    {
+      LOG.info("Terminate ZkClient event thread.");
+    }
+  }
+
+  public void send(ZkCacheEvent event)
+  {
+    if (!isInterrupted())
+    {
+      LOG.debug("New event: " + event);
+      _events.add(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
new file mode 100644
index 0000000..36c4fa7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
@@ -0,0 +1,348 @@
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheEventThread.ZkCacheEvent;
+import org.apache.helix.store.HelixPropertyListener;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ZkCallbackCache<T> extends Cache<T> implements
+    IZkChildListener,
+    IZkDataListener,
+    IZkStateListener
+{
+  private static Logger LOG = Logger.getLogger(ZkCallbackCache.class);
+
+  final BaseDataAccessor<T> _accessor;
+  final String _chrootPath;
+
+  private final ZkCacheEventThread _eventThread;
+  private final Map<String, Set<HelixPropertyListener>> _listener;
+
+  public ZkCallbackCache(BaseDataAccessor<T> accessor, String chrootPath,
+                         List<String> paths, ZkCacheEventThread eventThread)
+  {
+    super();
+    _accessor = accessor;
+    _chrootPath = chrootPath;
+
+    _listener = new ConcurrentHashMap<String, Set<HelixPropertyListener>>();
+    _eventThread = eventThread;
+
+    // init cache
+    // System.out.println("init cache: " + paths);
+    if (paths != null && !paths.isEmpty())
+    {
+      for (String path : paths)
+      {
+        updateRecursive(path);
+      }
+    }
+  }
+
+  @Override
+  public void update(String path, T data, Stat stat)
+  {
+    String parentPath = new File(path).getParent();
+    String childName = new File(path).getName();
+
+    addToParentChildSet(parentPath, childName);
+    ZNode znode = _cache.get(path);
+    if (znode == null)
+    {
+      _cache.put(path, new ZNode(path, data, stat));
+      fireEvents(path, EventType.NodeCreated);
+    }
+    else
+    {
+      Stat oldStat = znode.getStat();
+
+      znode.setData(data);
+      znode.setStat(stat);
+      // System.out.println("\t\t--setData. path: " + path + ", data: " + data);
+
+      if (oldStat.getCzxid() != stat.getCzxid())
+      {
+        fireEvents(path, EventType.NodeDeleted);
+        fireEvents(path, EventType.NodeCreated);
+      }
+      else if (oldStat.getVersion() != stat.getVersion())
+      {
+        // System.out.println("\t--fireNodeChanged: " + path + ", oldVersion: " +
+        // oldStat.getVersion() + ", newVersion: " + stat.getVersion());
+        fireEvents(path, EventType.NodeDataChanged);
+      }
+    }
+  }
+
+  // TODO: make readData async
+  @Override
+  public void updateRecursive(String path)
+  {
+    if (path == null)
+    {
+      return;
+    }
+
+    try
+    {
+      _lock.writeLock().lock();
+      try
+      {
+        // subscribe changes before read
+        _accessor.subscribeDataChanges(path, this);
+
+        // update this node
+        Stat stat = new Stat();
+        T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+
+        update(path, readData, stat);
+      }
+      catch (ZkNoNodeException e)
+      {
+        // OK. znode not exists
+        // we still need to subscribe child change
+      }
+
+      // recursively update children nodes if not exists
+      // System.out.println("subcribeChildChange: " + path);
+      ZNode znode = _cache.get(path);
+      List<String> childNames = _accessor.subscribeChildChanges(path, this);
+      if (childNames != null && !childNames.isEmpty())
+      {
+        for (String childName : childNames)
+        {
+          if (!znode.hasChild(childName))
+          {
+            String childPath = path + "/" + childName;
+            znode.addChild(childName);
+            updateRecursive(childPath);
+          }
+        }
+      }
+    }
+    finally
+    {
+      _lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+  {
+    // System.out.println("handleChildChange: " + parentPath + ", " + currentChilds);
+
+    // this is invoked if subscribed for childChange and node gets deleted
+    if (currentChilds == null)
+    {
+      return;
+    }
+
+    updateRecursive(parentPath);
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data) throws Exception
+  {
+    // System.out.println("handleDataChange: " + dataPath);
+    try
+    {
+      _lock.writeLock().lock();
+
+      // TODO: optimize it by get stat from callback
+      Stat stat = new Stat();
+      Object readData =
+          _accessor.get(dataPath, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+
+      ZNode znode = _cache.get(dataPath);
+      if (znode != null)
+      {
+        Stat oldStat = znode.getStat();
+
+        // System.out.println("handleDataChange: " + dataPath + ", data: " + data);
+        // System.out.println("handleDataChange: " + dataPath + ", oldCzxid: " +
+        // oldStat.getCzxid() + ", newCzxid: " + stat.getCzxid()
+        // + ", oldVersion: " + oldStat.getVersion() + ", newVersion: " +
+        // stat.getVersion());
+        znode.setData(readData);
+        znode.setStat(stat);
+
+        // if create right after delete, and zkCallback comes after create
+        // no DataDelete() will be fired, instead will fire 2 DataChange()
+        // see ZkClient.fireDataChangedEvents()
+        if (oldStat.getCzxid() != stat.getCzxid())
+        {
+          fireEvents(dataPath, EventType.NodeDeleted);
+          fireEvents(dataPath, EventType.NodeCreated);
+        }
+        else if (oldStat.getVersion() != stat.getVersion())
+        {
+          // System.out.println("\t--fireNodeChanged: " + dataPath + ", oldVersion: " +
+          // oldStat.getVersion() + ", newVersion: " + stat.getVersion());
+          fireEvents(dataPath, EventType.NodeDataChanged);
+        }
+      }
+      else
+      {
+        // we may see dataChange on child before childChange on parent
+        // in this case, let childChange update cache
+      }
+    }
+    finally
+    {
+      _lock.writeLock().unlock();
+    }
+
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath) throws Exception
+  {
+    // System.out.println("handleDataDeleted: " + dataPath);
+
+    try
+    {
+      _lock.writeLock().lock();
+      _accessor.unsubscribeDataChanges(dataPath, this);
+      _accessor.unsubscribeChildChanges(dataPath, this);
+
+      String parentPath = new File(dataPath).getParent();
+      String name = new File(dataPath).getName();
+      removeFromParentChildSet(parentPath, name);
+      _cache.remove(dataPath);
+
+      fireEvents(dataPath, EventType.NodeDeleted);
+    }
+    finally
+    {
+      _lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void handleStateChanged(KeeperState state) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void handleNewSession() throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  public void subscribe(String path, HelixPropertyListener listener)
+  {
+    synchronized (_listener)
+    {
+      Set<HelixPropertyListener> listeners = _listener.get(path);
+      if (listeners == null)
+      {
+        listeners = new CopyOnWriteArraySet<HelixPropertyListener>();
+        _listener.put(path, listeners);
+      }
+      listeners.add(listener);
+    }
+  }
+
+  public void unsubscribe(String path, HelixPropertyListener childListener)
+  {
+    synchronized (_listener)
+    {
+      final Set<HelixPropertyListener> listeners = _listener.get(path);
+      if (listeners != null)
+      {
+        listeners.remove(childListener);
+      }
+    }
+  }
+
+  private void fireEvents(final String path, EventType type)
+  {
+    String tmpPath = path;
+    final String clientPath =
+        (_chrootPath == null ? path : (_chrootPath.equals(path) ? "/"
+            : path.substring(_chrootPath.length())));
+
+    while (tmpPath != null)
+    {
+      Set<HelixPropertyListener> listeners = _listener.get(tmpPath);
+
+      if (listeners != null && !listeners.isEmpty())
+      {
+        for (final HelixPropertyListener listener : listeners)
+        {
+          try
+          {
+            switch (type)
+            {
+            case NodeDataChanged:
+              // listener.onDataChange(path);
+              _eventThread.send(new ZkCacheEvent("dataChange on " + path + " send to "
+                  + listener)
+              {
+                @Override
+                public void run() throws Exception
+                {
+                  listener.onDataChange(clientPath);
+                }
+              });
+              break;
+            case NodeCreated:
+              // listener.onDataCreate(path);
+              _eventThread.send(new ZkCacheEvent("dataCreate on " + path + " send to "
+                  + listener)
+              {
+                @Override
+                public void run() throws Exception
+                {
+                  listener.onDataCreate(clientPath);
+                }
+              });
+              break;
+            case NodeDeleted:
+              // listener.onDataDelete(path);
+              _eventThread.send(new ZkCacheEvent("dataDelete on " + path + " send to "
+                  + listener)
+              {
+                @Override
+                public void run() throws Exception
+                {
+                  listener.onDataDelete(clientPath);
+                }
+              });
+              break;
+            default:
+              break;
+            }
+          }
+          catch (Exception e)
+          {
+            LOG.error("Exception in handle events.", e);
+          }
+        }
+      }
+
+      tmpPath = new File(tmpPath).getParent();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
new file mode 100644
index 0000000..e1531a1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -0,0 +1,445 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.SerializableSerializer;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * ZKClient does not provide some functionalities, this will be used for quick fixes if
+ * any bug found in ZKClient or if we need additional features but can't wait for the new
+ * ZkClient jar Ideally we should commit the changes we do here to ZKClient.
+ *
+ * @author kgopalak
+ *
+ */
+
+public class ZkClient extends org.I0Itec.zkclient.ZkClient
+{
+  private static Logger LOG = Logger.getLogger(ZkClient.class);
+  public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
+  public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
+  // public static String sessionId;
+  // public static String sessionPassword;
+
+  private PathBasedZkSerializer _zkSerializer;
+
+  public ZkClient(IZkConnection connection, int connectionTimeout,
+                  PathBasedZkSerializer zkSerializer)
+  {
+    super(connection, connectionTimeout, new ByteArraySerializer());
+    _zkSerializer = zkSerializer;
+
+    StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+    LOG.info("create a new zkclient. " + Arrays.asList(calls));
+  }
+
+  public ZkClient(IZkConnection connection, int connectionTimeout,
+                  ZkSerializer zkSerializer)
+  {
+    this(connection, connectionTimeout, new BasicZkSerializer(zkSerializer));
+  }
+
+  public ZkClient(IZkConnection connection, int connectionTimeout)
+  {
+    this(connection, connectionTimeout, new SerializableSerializer());
+  }
+
+  public ZkClient(IZkConnection connection)
+  {
+    this(connection, Integer.MAX_VALUE, new SerializableSerializer());
+  }
+
+  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
+                  ZkSerializer zkSerializer)
+  {
+    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
+  }
+
+  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
+                  PathBasedZkSerializer zkSerializer)
+  {
+    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
+  }
+
+  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)
+  {
+    this(new ZkConnection(zkServers, sessionTimeout),
+         connectionTimeout,
+         new SerializableSerializer());
+  }
+
+  public ZkClient(String zkServers, int connectionTimeout)
+  {
+    this(new ZkConnection(zkServers), connectionTimeout, new SerializableSerializer());
+  }
+
+  public ZkClient(String zkServers)
+  {
+    this(new ZkConnection(zkServers), Integer.MAX_VALUE, new SerializableSerializer());
+  }
+
+  {
+  }
+
+  @Override
+  public void setZkSerializer(ZkSerializer zkSerializer)
+  {
+    _zkSerializer = new BasicZkSerializer(zkSerializer);
+  }
+
+  public void setZkSerializer(PathBasedZkSerializer zkSerializer)
+  {
+    _zkSerializer = zkSerializer;
+  }
+
+  public IZkConnection getConnection()
+  {
+    return _connection;
+  }
+
+  @Override
+  public void close() throws ZkInterruptedException
+  {
+    StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+    LOG.info("closing a zkclient. zookeeper: "
+        + (_connection == null ? "null" : ((ZkConnection) _connection).getZookeeper())
+        + ", callStack: " + Arrays.asList(calls));
+
+    super.close();
+  }
+
+  public Stat getStat(final String path)
+  {
+    long startT = System.nanoTime();
+
+    try
+    {
+      Stat stat = retryUntilConnected(new Callable<Stat>()
+      {
+
+        @Override
+        public Stat call() throws Exception
+        {
+          Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false);
+          return stat;
+        }
+      });
+
+      return stat;
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("exists, path: " + path + ", time: " + (endT - startT) + " ns");
+    }
+  }
+
+  // override exists(path, watch), so we can record all exists requests
+  @Override
+  protected boolean exists(final String path, final boolean watch)
+  {
+    long startT = System.nanoTime();
+
+    try
+    {
+      return retryUntilConnected(new Callable<Boolean>()
+      {
+        @Override
+        public Boolean call() throws Exception
+        {
+          return _connection.exists(path, watch);
+        }
+      });
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("exists, path: " + path + ", time: " + (endT - startT) + " ns");
+    }
+  }
+
+  // override getChildren(path, watch), so we can record all getChildren requests
+  @Override
+  protected List<String> getChildren(final String path, final boolean watch)
+  {
+    long startT = System.nanoTime();
+
+    try
+    {
+      return retryUntilConnected(new Callable<List<String>>()
+      {
+        @Override
+        public List<String> call() throws Exception
+        {
+          return _connection.getChildren(path, watch);
+        }
+      });
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("getChildren, path: " + path + ", time: " + (endT - startT) + " ns");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T deserialize(byte[] data, String path)
+  {
+    if (data == null)
+    {
+      return null;
+    }
+    return (T) _zkSerializer.deserialize(data, path);
+  }
+
+  // override readData(path, stat, watch), so we can record all read requests
+  @Override
+  @SuppressWarnings("unchecked")
+  protected <T extends Object> T readData(final String path,
+                                          final Stat stat,
+                                          final boolean watch)
+  {
+    long startT = System.nanoTime();
+    try
+    {
+      byte[] data = retryUntilConnected(new Callable<byte[]>()
+      {
+
+        @Override
+        public byte[] call() throws Exception
+        {
+          return _connection.readData(path, stat, watch);
+        }
+      });
+      return (T) deserialize(data, path);
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("getData, path: " + path + ", time: " + (endT - startT) + " ns");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T readDataAndStat(String path,
+                                              Stat stat,
+                                              boolean returnNullIfPathNotExists)
+  {
+    T data = null;
+    try
+    {
+      data = (T) super.readData(path, stat);
+    }
+    catch (ZkNoNodeException e)
+    {
+      if (!returnNullIfPathNotExists)
+      {
+        throw e;
+      }
+    }
+    return data;
+  }
+
+  public String getServers()
+  {
+    return _connection.getServers();
+  }
+
+  public byte[] serialize(Object data, String path)
+  {
+    return _zkSerializer.serialize(data, path);
+  }
+
+  @Override
+  public void writeData(final String path, Object datat, final int expectedVersion)
+  {
+    long startT = System.nanoTime();
+    try
+    {
+      final byte[] data = serialize(datat, path);
+
+      retryUntilConnected(new Callable<Object>()
+      {
+
+        @Override
+        public Object call() throws Exception
+        {
+          _connection.writeData(path, data, expectedVersion);
+          return null;
+        }
+      });
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("setData, path: " + path + ", time: " + (endT - startT) + " ns");
+    }
+  }
+
+  public Stat writeDataGetStat(final String path, Object datat, final int expectedVersion) throws InterruptedException
+  {
+    Stat stat = null;
+    long start = System.nanoTime();
+    try
+    {
+      byte[] bytes = _zkSerializer.serialize(datat, path);
+      stat =
+          ((ZkConnection) _connection).getZookeeper().setData(path,
+                                                              bytes,
+                                                              expectedVersion);
+      return stat;
+    }
+    catch (KeeperException e)
+    {
+      throw ZkException.create(e);
+    }
+    finally
+    {
+      long end = System.nanoTime();
+      LOG.info("setData, path: " + path + ", time: " + (end - start) + " ns");
+    }
+  }
+  
+  @Override
+  public String create(final String path, Object data, final CreateMode mode) throws ZkInterruptedException,
+      IllegalArgumentException,
+      ZkException,
+      RuntimeException
+  {
+    if (path == null)
+    {
+      throw new NullPointerException("path must not be null.");
+    }
+
+    long startT = System.nanoTime();
+    try
+    {
+      final byte[] bytes = data == null ? null : serialize(data, path);
+
+      return retryUntilConnected(new Callable<String>()
+      {
+
+        @Override
+        public String call() throws Exception
+        {
+          return _connection.create(path, bytes, mode);
+        }
+      });
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("create, path: " + path + ", time: " + (endT - startT) + " ns");
+    }
+  }
+
+  @Override
+  public boolean delete(final String path)
+  {
+    long startT = System.nanoTime();
+    try
+    {
+      try
+      {
+        retryUntilConnected(new Callable<Object>()
+        {
+
+          @Override
+          public Object call() throws Exception
+          {
+            _connection.delete(path);
+            return null;
+          }
+        });
+
+        return true;
+      }
+      catch (ZkNoNodeException e)
+      {
+        return false;
+      }
+    }
+    finally
+    {
+      long endT = System.nanoTime();
+      LOG.info("delete, path: " + path + ", time: " + (endT - startT) + " ns");
+    }
+  }
+
+  public void asyncCreate(final String path,
+                          Object datat,
+                          CreateMode mode,
+                          CreateCallbackHandler cb)
+  {
+    byte[] data = null;
+    if (datat != null)
+    {
+      data = serialize(datat, path);
+    }
+    ((ZkConnection) _connection).getZookeeper().create(path, data, Ids.OPEN_ACL_UNSAFE, // Arrays.asList(DEFAULT_ACL),
+                                                       mode,
+                                                       cb,
+                                                       null);
+  }
+
+  public void asyncSetData(final String path,
+                           Object datat,
+                           int version,
+                           SetDataCallbackHandler cb)
+  {
+    final byte[] data = serialize(datat, path);
+    ((ZkConnection) _connection).getZookeeper().setData(path, data, version, cb, null);
+
+  }
+
+  public void asyncGetData(final String path, GetDataCallbackHandler cb)
+  {
+    ((ZkConnection) _connection).getZookeeper().getData(path, null, cb, null);
+  }
+
+  public void asyncExists(final String path, ExistsCallbackHandler cb)
+  {
+    ((ZkConnection) _connection).getZookeeper().exists(path, null, cb, null);
+
+  }
+
+  public void asyncDelete(String path, DeleteCallbackHandler cb)
+  {
+    ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
new file mode 100644
index 0000000..d830449
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
@@ -0,0 +1,93 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class ZkStateChangeListener implements IZkStateListener
+{
+  private volatile boolean _isConnected;
+  private volatile boolean _hasSessionExpired;
+  private final ZKHelixManager _zkHelixManager;
+
+  private static Logger logger = Logger.getLogger(ZkStateChangeListener.class);
+
+  public ZkStateChangeListener(ZKHelixManager zkHelixManager)
+  {
+    this._zkHelixManager = zkHelixManager;
+
+  }
+
+  @Override
+  public void handleNewSession()
+  {
+    // TODO:bug in zkclient .
+    // zkclient does not invoke handleStateChanged when a session expires but
+    // directly invokes handleNewSession
+    _isConnected = true;
+    _hasSessionExpired = false;
+    _zkHelixManager.handleNewSession();
+  }
+
+  @Override
+  public void handleStateChanged(KeeperState keeperState) throws Exception
+  {
+    switch (keeperState)
+    {
+    case SyncConnected:
+      ZkConnection zkConnection =
+          ((ZkConnection) _zkHelixManager._zkClient.getConnection());
+      logger.info("KeeperState: " + keeperState + ", zookeeper:" + zkConnection.getZookeeper());
+      _isConnected = true;
+      break;
+    case Disconnected:
+      logger.info("KeeperState:" + keeperState + ", disconnectedSessionId: "
+          + _zkHelixManager._sessionId + ", instance: "
+          + _zkHelixManager.getInstanceName() + ", type: "
+          + _zkHelixManager.getInstanceType());
+
+      _isConnected = false;
+      break;
+    case Expired:
+      logger.info("KeeperState:" + keeperState + ", expiredSessionId: "
+          + _zkHelixManager._sessionId + ", instance: "
+          + _zkHelixManager.getInstanceName() + ", type: "
+          + _zkHelixManager.getInstanceType());
+
+      _isConnected = false;
+      _hasSessionExpired = true;
+      break;
+    }
+  }
+
+  boolean isConnected()
+  {
+    return _isConnected;
+  }
+
+  void disconnect()
+  {
+    _isConnected = false;
+  }
+
+  boolean hasSessionExpired()
+  {
+    return _hasSessionExpired;
+  }
+}