You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[15/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
new file mode 100644
index 0000000..24b8cc3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -0,0 +1,1243 @@
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+
+
+public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
+{
+ enum RetCode
+ {
+ OK, NODE_EXISTS, ERROR
+ }
+
+ private static Logger LOG = Logger.getLogger(ZkBaseDataAccessor.class);
+
+ private final ZkClient _zkClient;
+
+ public ZkBaseDataAccessor(ZkClient zkClient)
+ {
+ _zkClient = zkClient;
+ }
+
+ /**
+ * sync create
+ */
+ @Override
+ public boolean create(String path, T record, int options)
+ {
+ return create(path, record, null, options) == RetCode.OK;
+ }
+
+ /**
+ * sync create
+ */
+ public RetCode create(String path, T record, List<String> pathCreated, int options)
+ {
+ CreateMode mode = AccessOption.getMode(options);
+ if (mode == null)
+ {
+ LOG.error("Invalid create mode. options: " + options);
+ return RetCode.ERROR;
+ }
+
+ boolean retry;
+ do
+ {
+ retry = false;
+ try
+ {
+ _zkClient.create(path, record, mode);
+ if (pathCreated != null)
+ pathCreated.add(path);
+
+ return RetCode.OK;
+ }
+ catch (ZkNoNodeException e)
+ {
+ // this will happen if parent node does not exist
+ String parentPath = new File(path).getParent();
+ try
+ {
+ RetCode rc = create(parentPath, null, pathCreated, AccessOption.PERSISTENT);
+ if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
+ {
+ // if parent node created/exists, retry
+ retry = true;
+ }
+ }
+ catch (Exception e1)
+ {
+ LOG.error("Exception while creating path: " + parentPath, e1);
+ return RetCode.ERROR;
+ }
+ }
+ catch (ZkNodeExistsException e)
+ {
+ LOG.warn("Node already exists. path: " + path);
+ return RetCode.NODE_EXISTS;
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception while creating path: " + path, e);
+ return RetCode.ERROR;
+ }
+ }
+ while (retry);
+
+ return RetCode.OK;
+ }
+
+ /**
+ * sync set
+ */
+ @Override
+ public boolean set(String path, T record, int options)
+ {
+ return set(path, record, null, null, -1, options);
+ }
+
+ /**
+ * sync set
+ *
+ * @param setstat
+ * : if node is created instead of set, stat will NOT be set
+ */
+ public boolean set(String path,
+ T record,
+ List<String> pathsCreated,
+ Stat setstat,
+ int expectVersion,
+ int options)
+ {
+ CreateMode mode = AccessOption.getMode(options);
+ if (mode == null)
+ {
+ LOG.error("Invalid set mode. options: " + options);
+ return false;
+ }
+
+ boolean retry;
+ do
+ {
+ retry = false;
+ try
+ {
+ // _zkClient.writeData(path, record);
+ Stat setStat = _zkClient.writeDataGetStat(path, record, expectVersion);
+ if (setstat != null)
+ DataTree.copyStat(setStat, setstat);
+ }
+ catch (ZkNoNodeException e)
+ {
+ // node not exists, try create. in this case, stat will not be set
+ try
+ {
+ RetCode rc = create(path, record, pathsCreated, options);
+ // if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
+ // retry = true;
+ switch (rc)
+ {
+ case OK:
+ // not set stat if node is created (instead of set)
+ break;
+ case NODE_EXISTS:
+ retry = true;
+ break;
+ default:
+ LOG.error("Fail to set path by creating: " + path);
+ return false;
+ }
+ }
+ catch (Exception e1)
+ {
+ LOG.error("Exception while setting path by creating: " + path, e);
+ return false;
+ }
+ }
+ catch (ZkBadVersionException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception while setting path: " + path, e);
+ return false;
+ }
+ }
+ while (retry);
+
+ return true;
+ }
+
+ /**
+ * sync update
+ */
+ @Override
+ public boolean update(String path, DataUpdater<T> updater, int options)
+ {
+ return update(path, updater, null, null, options) != null;
+ }
+
+ /**
+ * sync update
+ *
+ * @return: updatedData on success, or null on fail
+ */
+ public T update(String path,
+ DataUpdater<T> updater,
+ List<String> createPaths,
+ Stat stat,
+ int options)
+ {
+ CreateMode mode = AccessOption.getMode(options);
+ if (mode == null)
+ {
+ LOG.error("Invalid update mode. options: " + options);
+ return null;
+ }
+
+ boolean retry;
+ T updatedData = null;
+ do
+ {
+ retry = false;
+ try
+ {
+ Stat readStat = new Stat();
+ T oldData = (T) _zkClient.readData(path, readStat);
+ T newData = updater.update(oldData);
+ Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
+ if (stat != null)
+ {
+ DataTree.copyStat(setStat, stat);
+ }
+
+ updatedData = newData;
+ }
+ catch (ZkBadVersionException e)
+ {
+ retry = true;
+ }
+ catch (ZkNoNodeException e)
+ {
+ // node not exist, try create
+ try
+ {
+ T newData = updater.update(null);
+ RetCode rc = create(path, newData, createPaths, options);
+ switch (rc)
+ {
+ case OK:
+ updatedData = newData;
+ break;
+ case NODE_EXISTS:
+ retry = true;
+ break;
+ default:
+ LOG.error("Fail to update path by creating: " + path);
+ return null;
+ }
+ }
+ catch (Exception e1)
+ {
+ LOG.error("Exception while updating path by creating: " + path, e1);
+ return null;
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception while updating path: " + path, e);
+ return null;
+ }
+ }
+ while (retry);
+
+ return updatedData;
+ }
+
+ /**
+ * sync get
+ *
+ */
+ @Override
+ public T get(String path, Stat stat, int options)
+ {
+ T data = null;
+ try
+ {
+ data = (T) _zkClient.readData(path, stat);
+ }
+ catch (ZkNoNodeException e)
+ {
+ if (AccessOption.isThrowExceptionIfNotExist(options))
+ {
+ throw e;
+ }
+ }
+ return data;
+ }
+
+ /**
+ * async get
+ *
+ */
+ @Override
+ public List<T> get(List<String> paths, List<Stat> stats, int options)
+ {
+ boolean[] needRead = new boolean[paths.size()];
+ Arrays.fill(needRead, true);
+
+ return get(paths, stats, needRead);
+ }
+
+ /**
+ * async get
+ */
+ List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead)
+ {
+ if (paths == null || paths.size() == 0)
+ {
+ return Collections.emptyList();
+ }
+
+ // init stats
+ if (stats != null)
+ {
+ stats.clear();
+ stats.addAll(Collections.<Stat> nCopies(paths.size(), null));
+ }
+
+ long startT = System.nanoTime();
+
+ try
+ {
+ // issue asyn get requests
+ GetDataCallbackHandler[] cbList = new GetDataCallbackHandler[paths.size()];
+ for (int i = 0; i < paths.size(); i++)
+ {
+ if (!needRead[i])
+ continue;
+
+ String path = paths.get(i);
+ cbList[i] = new GetDataCallbackHandler();
+ _zkClient.asyncGetData(path, cbList[i]);
+ }
+
+ // wait for completion
+ for (int i = 0; i < cbList.length; i++)
+ {
+ if (!needRead[i])
+ continue;
+
+ GetDataCallbackHandler cb = cbList[i];
+ cb.waitForSuccess();
+ }
+
+ // construct return results
+ List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
+
+ for (int i = 0; i < paths.size(); i++)
+ {
+ if (!needRead[i])
+ continue;
+
+ GetDataCallbackHandler cb = cbList[i];
+ if (Code.get(cb.getRc()) == Code.OK)
+ {
+ @SuppressWarnings("unchecked")
+ T record = (T) _zkClient.deserialize(cb._data, paths.get(i));
+ records.set(i, record);
+ if (stats != null)
+ {
+ stats.set(i, cb._stat);
+ }
+ }
+ }
+
+ return records;
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("getData_async, size: " + paths.size() + ", paths: " + paths.get(0)
+ + ",... time: " + (endT - startT) + " ns");
+ }
+ }
+
+ /**
+ * asyn getChildren
+ *
+ */
+ @Override
+ public List<T> getChildren(String parentPath, List<Stat> stats, int options)
+ {
+ try
+ {
+ // prepare child paths
+ List<String> childNames = getChildNames(parentPath, options);
+ if (childNames == null || childNames.size() == 0)
+ {
+ return Collections.emptyList();
+ }
+
+ List<String> paths = new ArrayList<String>();
+ for (String childName : childNames)
+ {
+ String path = parentPath + "/" + childName;
+ paths.add(path);
+ }
+
+ // remove null record
+ List<Stat> curStats = new ArrayList<Stat>(paths.size());
+ List<T> records = get(paths, curStats, options);
+ Iterator<T> recordIter = records.iterator();
+ Iterator<Stat> statIter = curStats.iterator();
+ while (statIter.hasNext())
+ {
+ recordIter.next();
+ if (statIter.next() == null)
+ {
+ statIter.remove();
+ recordIter.remove();
+ }
+ }
+
+ if (stats != null)
+ {
+ stats.clear();
+ stats.addAll(curStats);
+ }
+
+ return records;
+ }
+ catch (ZkNoNodeException e)
+ {
+ return Collections.emptyList();
+ }
+ }
+
+ /**
+ * sync getChildNames
+ *
+ * @return null if parentPath doesn't exist
+ */
+ @Override
+ public List<String> getChildNames(String parentPath, int options)
+ {
+ try
+ {
+ List<String> childNames = _zkClient.getChildren(parentPath);
+ Collections.sort(childNames);
+ return childNames;
+ }
+ catch (ZkNoNodeException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * sync exists
+ *
+ */
+ @Override
+ public boolean exists(String path, int options)
+ {
+ return _zkClient.exists(path);
+ }
+
+ /**
+ * sync getStat
+ *
+ */
+ @Override
+ public Stat getStat(String path, int options)
+ {
+ return _zkClient.getStat(path);
+ }
+
+ /**
+ * sync remove
+ *
+ */
+ @Override
+ public boolean remove(String path, int options)
+ {
+ try
+ {
+ // optimize on common path
+ _zkClient.delete(path);
+ }
+ catch (ZkException e)
+ {
+ _zkClient.deleteRecursive(path);
+ }
+ return true;
+ }
+
+ /**
+ * async create. give up on error other than NONODE
+ *
+ */
+ CreateCallbackHandler[] create(List<String> paths,
+ List<T> records,
+ boolean[] needCreate,
+ List<List<String>> pathsCreated,
+ int options)
+ {
+ if ((records != null && records.size() != paths.size())
+ || needCreate.length != paths.size()
+ || (pathsCreated != null && pathsCreated.size() != paths.size()))
+ {
+ throw new IllegalArgumentException("paths, records, needCreate, and pathsCreated should be of same size");
+ }
+
+ CreateCallbackHandler[] cbList = new CreateCallbackHandler[paths.size()];
+
+ CreateMode mode = AccessOption.getMode(options);
+ if (mode == null)
+ {
+ LOG.error("Invalid async set mode. options: " + options);
+ return cbList;
+ }
+
+ boolean retry;
+ do
+ {
+ retry = false;
+
+ for (int i = 0; i < paths.size(); i++)
+ {
+ if (!needCreate[i])
+ continue;
+
+ String path = paths.get(i);
+ T record = records == null ? null : records.get(i);
+ cbList[i] = new CreateCallbackHandler();
+ _zkClient.asyncCreate(path, record, mode, cbList[i]);
+ }
+
+ List<String> parentPaths =
+ new ArrayList<String>(Collections.<String> nCopies(paths.size(), null));
+ boolean failOnNoNode = false;
+
+ for (int i = 0; i < paths.size(); i++)
+ {
+ if (!needCreate[i])
+ continue;
+
+ CreateCallbackHandler cb = cbList[i];
+ cb.waitForSuccess();
+ String path = paths.get(i);
+
+ if (Code.get(cb.getRc()) == Code.NONODE)
+ {
+ String parentPath = new File(path).getParent();
+ parentPaths.set(i, parentPath);
+ failOnNoNode = true;
+ }
+ else
+ {
+ // if create succeed or fail on error other than NONODE,
+ // give up
+ needCreate[i] = false;
+
+ // if succeeds, record what paths we've created
+ if (Code.get(cb.getRc()) == Code.OK && pathsCreated != null)
+ {
+ if (pathsCreated.get(i) == null)
+ {
+ pathsCreated.set(i, new ArrayList<String>());
+ }
+ pathsCreated.get(i).add(path);
+ }
+ }
+ }
+
+ if (failOnNoNode)
+ {
+ boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
+
+ CreateCallbackHandler[] parentCbList =
+ create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
+ for (int i = 0; i < parentCbList.length; i++)
+ {
+ CreateCallbackHandler parentCb = parentCbList[i];
+ if (parentCb == null)
+ continue;
+
+ Code rc = Code.get(parentCb.getRc());
+
+ // if parent is created, retry create child
+ if (rc == Code.OK || rc == Code.NODEEXISTS)
+ {
+ retry = true;
+ break;
+ }
+ }
+ }
+ }
+ while (retry);
+
+ return cbList;
+ }
+
+
+ /**
+ * async create
+ *
+ * TODO: rename to create
+ */
+ @Override
+ public boolean[] createChildren(List<String> paths, List<T> records, int options)
+ {
+ boolean[] success = new boolean[paths.size()];
+
+ CreateMode mode = AccessOption.getMode(options);
+ if (mode == null)
+ {
+ LOG.error("Invalid async create mode. options: " + options);
+ return success;
+ }
+
+ boolean[] needCreate = new boolean[paths.size()];
+ Arrays.fill(needCreate, true);
+ List<List<String>> pathsCreated =
+ new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
+
+ long startT = System.nanoTime();
+ try
+ {
+
+ CreateCallbackHandler[] cbList =
+ create(paths, records, needCreate, pathsCreated, options);
+
+ for (int i = 0; i < cbList.length; i++)
+ {
+ CreateCallbackHandler cb = cbList[i];
+ success[i] = (Code.get(cb.getRc()) == Code.OK);
+ }
+
+ return success;
+
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("create_async, size: " + paths.size() + ", paths: " + paths.get(0)
+ + ",... time: " + (endT - startT) + " ns");
+ }
+ }
+
+ /**
+ * async set
+ *
+ * TODO: rename to set
+ *
+ */
+ @Override
+ public boolean[] setChildren(List<String> paths, List<T> records, int options)
+ {
+ return set(paths, records, null, null, options);
+ }
+
+ /**
+ * async set, give up on error other than NoNode
+ *
+ */
+ boolean[] set(List<String> paths,
+ List<T> records,
+ List<List<String>> pathsCreated,
+ List<Stat> stats,
+ int options)
+ {
+ if (paths == null || paths.size() == 0)
+ {
+ return new boolean[0];
+ }
+
+ if ((records != null && records.size() != paths.size())
+ || (pathsCreated != null && pathsCreated.size() != paths.size()))
+ {
+ throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size");
+ }
+
+ boolean[] success = new boolean[paths.size()];
+
+ CreateMode mode = AccessOption.getMode(options);
+ if (mode == null)
+ {
+ LOG.error("Invalid async set mode. options: " + options);
+ return success;
+ }
+
+ List<Stat> setStats =
+ new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
+ SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
+ CreateCallbackHandler[] createCbList = null;
+ boolean[] needSet = new boolean[paths.size()];
+ Arrays.fill(needSet, true);
+
+ long startT = System.nanoTime();
+
+ try
+ {
+ boolean retry;
+ do
+ {
+ retry = false;
+
+ for (int i = 0; i < paths.size(); i++)
+ {
+ if (!needSet[i])
+ continue;
+
+ String path = paths.get(i);
+ T record = records.get(i);
+ cbList[i] = new SetDataCallbackHandler();
+ _zkClient.asyncSetData(path, record, -1, cbList[i]);
+
+ }
+
+ boolean failOnNoNode = false;
+
+ for (int i = 0; i < cbList.length; i++)
+ {
+ SetDataCallbackHandler cb = cbList[i];
+ cb.waitForSuccess();
+ Code rc = Code.get(cb.getRc());
+ switch (rc)
+ {
+ case OK:
+ setStats.set(i, cb.getStat());
+ needSet[i] = false;
+ break;
+ case NONODE:
+ // if fail on NoNode, try create the node
+ failOnNoNode = true;
+ break;
+ default:
+ // if fail on error other than NoNode, give up
+ needSet[i] = false;
+ break;
+ }
+ }
+
+ // if failOnNoNode, try create
+ if (failOnNoNode)
+ {
+ boolean[] needCreate = Arrays.copyOf(needSet, needSet.length);
+ createCbList = create(paths, records, needCreate, pathsCreated, options);
+ for (int i = 0; i < createCbList.length; i++)
+ {
+ CreateCallbackHandler createCb = createCbList[i];
+ if (createCb == null)
+ {
+ continue;
+ }
+
+ Code rc = Code.get(createCb.getRc());
+ switch (rc)
+ {
+ case OK:
+ setStats.set(i, ZNode.ZERO_STAT);
+ needSet[i] = false;
+ break;
+ case NODEEXISTS:
+ retry = true;
+ break;
+ default:
+ // if creation fails on error other than NodeExists
+ // no need to retry set
+ needSet[i] = false;
+ break;
+ }
+ }
+ }
+ }
+ while (retry);
+
+ // construct return results
+ for (int i = 0; i < cbList.length; i++)
+ {
+ SetDataCallbackHandler cb = cbList[i];
+
+ Code rc = Code.get(cb.getRc());
+ if (rc == Code.OK)
+ {
+ success[i] = true;
+ }
+ else if (rc == Code.NONODE)
+ {
+ CreateCallbackHandler createCb = createCbList[i];
+ if (Code.get(createCb.getRc()) == Code.OK)
+ {
+ success[i] = true;
+ }
+ }
+ }
+
+ if (stats != null)
+ {
+ stats.clear();
+ stats.addAll(setStats);
+ }
+
+ return success;
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
+ + ",... time: " + (endT - startT) + " ns");
+ }
+ }
+
+ // TODO: rename to update
+ /**
+ * async update
+ */
+ @Override
+ public boolean[] updateChildren(List<String> paths,
+ List<DataUpdater<T>> updaters,
+ int options)
+ {
+
+ List<T> updateData = update(paths, updaters, null, null, options);
+ boolean[] success = new boolean[paths.size()]; // init to false
+ for (int i = 0; i < paths.size(); i++)
+ {
+ T data = updateData.get(i);
+ success[i] = (data != null);
+ }
+ return success;
+ }
+
+ /**
+ * async update
+ *
+ * return: updatedData on success or null on fail
+ */
+ List<T> update(List<String> paths,
+ List<DataUpdater<T>> updaters,
+ List<List<String>> pathsCreated,
+ List<Stat> stats,
+ int options)
+ {
+ if (paths == null || paths.size() == 0)
+ {
+ LOG.error("paths is null or empty");
+ return Collections.emptyList();
+ }
+
+ if (updaters.size() != paths.size()
+ || (pathsCreated != null && pathsCreated.size() != paths.size()))
+ {
+ throw new IllegalArgumentException("paths, updaters, and pathsCreated should be of same size");
+ }
+
+ List<Stat> setStats =
+ new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
+ List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
+
+ CreateMode mode = AccessOption.getMode(options);
+ if (mode == null)
+ {
+ LOG.error("Invalid update mode. options: " + options);
+ return updateData;
+ }
+
+ SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
+ CreateCallbackHandler[] createCbList = null;
+ boolean[] needUpdate = new boolean[paths.size()];
+ Arrays.fill(needUpdate, true);
+
+ long startT = System.nanoTime();
+
+ try
+ {
+ boolean retry;
+ do
+ {
+ retry = false;
+ boolean[] needCreate = new boolean[paths.size()]; // init'ed with false
+ boolean failOnNoNode = false;
+
+ // asycn read all data
+ List<Stat> curStats = new ArrayList<Stat>();
+ List<T> curDataList =
+ get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length));
+
+ // async update
+ List<T> newDataList = new ArrayList<T>();
+ for (int i = 0; i < paths.size(); i++)
+ {
+ if (!needUpdate[i])
+ {
+ newDataList.add(null);
+ continue;
+ }
+ String path = paths.get(i);
+ DataUpdater<T> updater = updaters.get(i);
+ T newData = updater.update(curDataList.get(i));
+ newDataList.add(newData);
+ Stat curStat = curStats.get(i);
+ if (curStat == null)
+ {
+ // node not exists
+ failOnNoNode = true;
+ needCreate[i] = true;
+ }
+ else
+ {
+ cbList[i] = new SetDataCallbackHandler();
+ _zkClient.asyncSetData(path, newData, curStat.getVersion(), cbList[i]);
+ }
+ }
+
+ // wait for completion
+ boolean failOnBadVersion = false;
+
+ for (int i = 0; i < paths.size(); i++)
+ {
+ SetDataCallbackHandler cb = cbList[i];
+ if (cb == null)
+ continue;
+
+ cb.waitForSuccess();
+
+ switch (Code.get(cb.getRc()))
+ {
+ case OK:
+ updateData.set(i, newDataList.get(i));
+ setStats.set(i, cb.getStat());
+ needUpdate[i] = false;
+ break;
+ case NONODE:
+ failOnNoNode = true;
+ needCreate[i] = true;
+ break;
+ case BADVERSION:
+ failOnBadVersion = true;
+ break;
+ default:
+ // if fail on error other than NoNode or BadVersion
+ // will not retry
+ needUpdate[i] = false;
+ break;
+ }
+ }
+
+ // if failOnNoNode, try create
+ if (failOnNoNode)
+ {
+ createCbList = create(paths, newDataList, needCreate, pathsCreated, options);
+ for (int i = 0; i < paths.size(); i++)
+ {
+ CreateCallbackHandler createCb = createCbList[i];
+ if (createCb == null)
+ {
+ continue;
+ }
+
+ switch (Code.get(createCb.getRc()))
+ {
+ case OK:
+ needUpdate[i] = false;
+ updateData.set(i, newDataList.get(i));
+ setStats.set(i, ZNode.ZERO_STAT);
+ break;
+ case NODEEXISTS:
+ retry = true;
+ break;
+ default:
+ // if fail on error other than NodeExists
+ // will not retry
+ needUpdate[i] = false;
+ break;
+ }
+ }
+ }
+
+ // if failOnBadVersion, retry
+ if (failOnBadVersion)
+ {
+ retry = true;
+ }
+ }
+ while (retry);
+
+ if (stats != null)
+ {
+ stats.clear();
+ stats.addAll(setStats);
+ }
+
+ return updateData;
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
+ + ",... time: " + (endT - startT) + " ns");
+ }
+
+ }
+
+ /**
+ * async exists
+ *
+ */
+ @Override
+ public boolean[] exists(List<String> paths, int options)
+ {
+ Stat[] stats = getStats(paths, options);
+
+ boolean[] exists = new boolean[paths.size()];
+ for (int i = 0; i < paths.size(); i++)
+ {
+ exists[i] = (stats[i] != null);
+ }
+
+ return exists;
+ }
+
+ /**
+ * async getStat
+ *
+ */
+ @Override
+ public Stat[] getStats(List<String> paths, int options)
+ {
+ if (paths == null || paths.size() == 0)
+ {
+ LOG.error("paths is null or empty");
+ return new Stat[0];
+ }
+
+ Stat[] stats = new Stat[paths.size()];
+
+ long startT = System.nanoTime();
+
+ try
+ {
+ ExistsCallbackHandler[] cbList = new ExistsCallbackHandler[paths.size()];
+ for (int i = 0; i < paths.size(); i++)
+ {
+ String path = paths.get(i);
+ cbList[i] = new ExistsCallbackHandler();
+ _zkClient.asyncExists(path, cbList[i]);
+ }
+
+ for (int i = 0; i < cbList.length; i++)
+ {
+ ExistsCallbackHandler cb = cbList[i];
+ cb.waitForSuccess();
+ stats[i] = cb._stat;
+ }
+
+ return stats;
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("exists_async, size: " + paths.size() + ", paths: " + paths.get(0)
+ + ",... time: " + (endT - startT) + " ns");
+ }
+ }
+
+ /**
+ * async remove
+ *
+ */
+ @Override
+ public boolean[] remove(List<String> paths, int options)
+ {
+ if (paths == null || paths.size() == 0)
+ {
+ return new boolean[0];
+ }
+
+ boolean[] success = new boolean[paths.size()];
+
+ DeleteCallbackHandler[] cbList = new DeleteCallbackHandler[paths.size()];
+
+ long startT = System.nanoTime();
+
+ try
+ {
+ for (int i = 0; i < paths.size(); i++)
+ {
+ String path = paths.get(i);
+ cbList[i] = new DeleteCallbackHandler();
+ _zkClient.asyncDelete(path, cbList[i]);
+ }
+
+ for (int i = 0; i < cbList.length; i++)
+ {
+ DeleteCallbackHandler cb = cbList[i];
+ cb.waitForSuccess();
+ success[i] = (cb.getRc() == 0);
+ }
+
+ return success;
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("delete_async, size: " + paths.size() + ", paths: " + paths.get(0)
+ + ",... time: " + (endT - startT) + " ns");
+ }
+ }
+
+ /**
+ * Subscribe to zookeeper data changes
+ */
+ @Override
+ public void subscribeDataChanges(String path, IZkDataListener listener)
+ {
+ _zkClient.subscribeDataChanges(path, listener);
+ }
+
+ /**
+ * Unsubscribe to zookeeper data changes
+ */
+ @Override
+ public void unsubscribeDataChanges(String path, IZkDataListener dataListener)
+ {
+ _zkClient.unsubscribeDataChanges(path, dataListener);
+ }
+
+ /**
+ * Subscrie to zookeeper data changes
+ */
+ @Override
+ public List<String> subscribeChildChanges(String path, IZkChildListener listener)
+ {
+ return _zkClient.subscribeChildChanges(path, listener);
+ }
+
+ /**
+ * Unsubscrie to zookeeper data changes
+ */
+ @Override
+ public void unsubscribeChildChanges(String path, IZkChildListener childListener)
+ {
+ _zkClient.unsubscribeChildChanges(path, childListener);
+ }
+
+ // simple test
+ public static void main(String[] args)
+ {
+ ZkClient zkclient = new ZkClient("localhost:2191");
+ zkclient.setZkSerializer(new ZNRecordSerializer());
+ ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
+
+ // test async create
+ List<String> createPaths =
+ Arrays.asList("/test/child1/child1", "/test/child2/child2");
+ List<ZNRecord> createRecords =
+ Arrays.asList(new ZNRecord("child1"), new ZNRecord("child2"));
+
+ boolean[] needCreate = new boolean[createPaths.size()];
+ Arrays.fill(needCreate, true);
+ List<List<String>> pathsCreated =
+ new ArrayList<List<String>>(Collections.<List<String>> nCopies(createPaths.size(),
+ null));
+ accessor.create(createPaths,
+ createRecords,
+ needCreate,
+ pathsCreated,
+ AccessOption.PERSISTENT);
+ System.out.println("pathsCreated: " + pathsCreated);
+
+ // test async set
+ List<String> setPaths =
+ Arrays.asList("/test/setChild1/setChild1", "/test/setChild2/setChild2");
+ List<ZNRecord> setRecords =
+ Arrays.asList(new ZNRecord("setChild1"), new ZNRecord("setChild2"));
+
+ pathsCreated =
+ new ArrayList<List<String>>(Collections.<List<String>> nCopies(setPaths.size(),
+ null));
+ boolean[] success =
+ accessor.set(setPaths, setRecords, pathsCreated, null, AccessOption.PERSISTENT);
+ System.out.println("pathsCreated: " + pathsCreated);
+ System.out.println("setSuccess: " + Arrays.toString(success));
+
+ // test async update
+ List<String> updatePaths =
+ Arrays.asList("/test/updateChild1/updateChild1", "/test/setChild2/setChild2");
+ class TestUpdater implements DataUpdater<ZNRecord>
+ {
+ final ZNRecord _newData;
+
+ public TestUpdater(ZNRecord newData)
+ {
+ _newData = newData;
+ }
+
+ @Override
+ public ZNRecord update(ZNRecord currentData)
+ {
+ return _newData;
+
+ }
+ }
+ List<DataUpdater<ZNRecord>> updaters =
+ Arrays.asList((DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild1")),
+ (DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild2")));
+
+ pathsCreated =
+ new ArrayList<List<String>>(Collections.<List<String>> nCopies(updatePaths.size(),
+ null));
+
+ List<ZNRecord> updateRecords =
+ accessor.update(updatePaths, updaters, pathsCreated, null, AccessOption.PERSISTENT);
+ for (int i = 0; i < updatePaths.size(); i++)
+ {
+ success[i] = updateRecords.get(i) != null;
+ }
+ System.out.println("pathsCreated: " + pathsCreated);
+ System.out.println("updateSuccess: " + Arrays.toString(success));
+
+ System.out.println("CLOSING");
+ zkclient.close();
+ }
+
+ /**
+ * Reset
+ */
+ @Override
+ public void reset()
+ {
+ // Nothing to do
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
new file mode 100644
index 0000000..eafe7c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -0,0 +1,984 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.AccessOption;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
+import org.apache.helix.store.HelixPropertyListener;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+
+
+public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
+{
+ private static final Logger LOG =
+ Logger.getLogger(ZkCacheBaseDataAccessor.class);
+
+ protected WriteThroughCache<T> _wtCache;
+ protected ZkCallbackCache<T> _zkCache;
+
+ final ZkBaseDataAccessor<T> _baseAccessor;
+ final Map<String, Cache<T>> _cacheMap;
+
+ final String _chrootPath;
+ final List<String> _wtCachePaths;
+ final List<String> _zkCachePaths;
+
+ final HelixGroupCommit<T> _groupCommit = new HelixGroupCommit<T>();
+
+ // fire listeners
+ private final ReentrantLock _eventLock = new ReentrantLock();
+ private ZkCacheEventThread _eventThread;
+
+ private ZkClient _zkclient = null;
+
+ public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
+ List<String> wtCachePaths)
+ {
+ this(baseAccessor, null, wtCachePaths, null);
+ }
+
+ public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
+ String chrootPath,
+ List<String> wtCachePaths,
+ List<String> zkCachePaths)
+ {
+ _baseAccessor = baseAccessor;
+
+ if (chrootPath == null || chrootPath.equals("/"))
+ {
+ _chrootPath = null;
+ }
+ else
+ {
+ PathUtils.validatePath(chrootPath);
+ _chrootPath = chrootPath;
+ }
+
+ _wtCachePaths = wtCachePaths;
+ _zkCachePaths = zkCachePaths;
+
+ // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
+ // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
+ // comes first
+ _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
+ {
+ @Override
+ public int compare(String o1, String o2)
+ {
+ int len1 = o1.split("/").length;
+ int len2 = o2.split("/").length;
+ return len1 - len2;
+ }
+ });
+
+ start();
+ }
+
+ public ZkCacheBaseDataAccessor(String zkAddress,
+ ZkSerializer serializer,
+ String chrootPath,
+ List<String> wtCachePaths,
+ List<String> zkCachePaths)
+ {
+ _zkclient =
+ new ZkClient(zkAddress,
+ ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ serializer);
+ _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ _baseAccessor = new ZkBaseDataAccessor<T>(_zkclient);
+
+ if (chrootPath == null || chrootPath.equals("/"))
+ {
+ _chrootPath = null;
+ }
+ else
+ {
+ PathUtils.validatePath(chrootPath);
+ _chrootPath = chrootPath;
+ }
+
+ _wtCachePaths = wtCachePaths;
+ _zkCachePaths = zkCachePaths;
+
+ // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
+ // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
+ // comes first
+ _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
+ {
+ @Override
+ public int compare(String o1, String o2)
+ {
+ int len1 = o1.split("/").length;
+ int len2 = o2.split("/").length;
+ return len1 - len2;
+ }
+ });
+
+ start();
+ }
+
+ private String prependChroot(String clientPath)
+ {
+ if (_chrootPath != null)
+ {
+ // handle clientPath = "/"
+ if (clientPath.length() == 1)
+ {
+ return _chrootPath;
+ }
+ return _chrootPath + clientPath;
+ }
+ else
+ {
+ return clientPath;
+ }
+ }
+
+ private List<String> prependChroot(List<String> clientPaths)
+ {
+ List<String> serverPaths = new ArrayList<String>();
+ for (String clientPath : clientPaths)
+ {
+ serverPaths.add(prependChroot(clientPath));
+ }
+ return serverPaths;
+ }
+
+ /**
+ * find the first path in paths that is a descendant
+ */
+ private String firstCachePath(List<String> paths)
+ {
+ for (String cachePath : _cacheMap.keySet())
+ {
+ for (String path : paths)
+ {
+ if (path.startsWith(cachePath))
+ {
+ return path;
+ }
+ }
+ }
+ return null;
+ }
+
+ private Cache<T> getCache(String path)
+ {
+ for (String cachePath : _cacheMap.keySet())
+ {
+ if (path.startsWith(cachePath))
+ {
+ return _cacheMap.get(cachePath);
+ }
+ }
+
+ return null;
+ }
+
+ private Cache<T> getCache(List<String> paths)
+ {
+ Cache<T> cache = null;
+ for (String path : paths)
+ {
+ for (String cachePath : _cacheMap.keySet())
+ {
+ if (cache == null && path.startsWith(cachePath))
+ {
+ cache = _cacheMap.get(cachePath);
+ }
+ else if (cache != null && cache != _cacheMap.get(cachePath))
+ {
+ throw new IllegalArgumentException("Couldn't do cross-cache async operations. paths: "
+ + paths);
+ }
+ }
+ }
+
+ return cache;
+ }
+
+ private void updateCache(Cache<T> cache,
+ List<String> createPaths,
+ boolean success,
+ String updatePath,
+ T data,
+ Stat stat)
+ {
+ if (createPaths == null || createPaths.isEmpty())
+ {
+ if (success)
+ {
+ cache.update(updatePath, data, stat);
+ }
+ }
+ else
+ {
+ String firstPath = firstCachePath(createPaths);
+ if (firstPath != null)
+ {
+ cache.updateRecursive(firstPath);
+ }
+ }
+ }
+
+ @Override
+ public boolean create(String path, T data, int options)
+ {
+ String clientPath = path;
+ String serverPath = prependChroot(clientPath);
+
+ Cache<T> cache = getCache(serverPath);
+ if (cache != null)
+ {
+ try
+ {
+ cache.lockWrite();
+ List<String> pathsCreated = new ArrayList<String>();
+ RetCode rc = _baseAccessor.create(serverPath, data, pathsCreated, options);
+ boolean success = (rc == RetCode.OK);
+
+ updateCache(cache, pathsCreated, success, serverPath, data, ZNode.ZERO_STAT);
+
+ return success;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+
+ // no cache
+ return _baseAccessor.create(serverPath, data, options);
+ }
+
+ @Override
+ public boolean set(String path, T data, int options)
+ {
+ String clientPath = path;
+ String serverPath = prependChroot(clientPath);
+
+ Cache<T> cache = getCache(serverPath);
+ if (cache != null)
+ {
+ try
+ {
+ cache.lockWrite();
+ Stat setStat = new Stat();
+ List<String> pathsCreated = new ArrayList<String>();
+ boolean success =
+ _baseAccessor.set(serverPath, data, pathsCreated, setStat, -1, options);
+
+ updateCache(cache, pathsCreated, success, serverPath, data, setStat);
+
+ return success;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+
+ // no cache
+ return _baseAccessor.set(serverPath, data, options);
+ }
+
+ @Override
+ public boolean update(String path, DataUpdater<T> updater, int options)
+ {
+ String clientPath = path;
+ String serverPath = prependChroot(clientPath);
+
+ Cache<T> cache = getCache(serverPath);
+
+ if (cache != null)
+ {
+ try
+ {
+ cache.lockWrite();
+ Stat setStat = new Stat();
+ List<String> pathsCreated = new ArrayList<String>();
+ T updateData =
+ _baseAccessor.update(serverPath, updater, pathsCreated, setStat, options);
+ boolean success = (updateData != null);
+ updateCache(cache, pathsCreated, success, serverPath, updateData, setStat);
+
+ return success;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+
+ // no cache
+ return _groupCommit.commit(_baseAccessor, options, serverPath, updater);
+ // return _baseAccessor.update(serverPath, updater, options);
+ }
+
+ @Override
+ public boolean exists(String path, int options)
+ {
+ String clientPath = path;
+ String serverPath = prependChroot(clientPath);
+
+ Cache<T> cache = getCache(serverPath);
+ if (cache != null)
+ {
+ boolean exists = cache.exists(serverPath);
+ if (exists)
+ {
+ return true;
+ }
+ }
+
+ // if not exists in cache, always fall back to zk
+ return _baseAccessor.exists(serverPath, options);
+ }
+
+ @Override
+ public boolean remove(String path, int options)
+ {
+ String clientPath = path;
+ String serverPath = prependChroot(clientPath);
+
+ Cache<T> cache = getCache(serverPath);
+ if (cache != null)
+ {
+ try
+ {
+ cache.lockWrite();
+
+ boolean success = _baseAccessor.remove(serverPath, options);
+ if (success)
+ {
+ cache.purgeRecursive(serverPath);
+ }
+
+ return success;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+
+ // no cache
+ return _baseAccessor.remove(serverPath, options);
+ }
+
+ @Override
+ public T get(String path, Stat stat, int options)
+ {
+ String clientPath = path;
+ String serverPath = prependChroot(clientPath);
+
+ Cache<T> cache = getCache(serverPath);
+ if (cache != null)
+ {
+ T record = null;
+ ZNode znode = cache.get(serverPath);
+
+ if (znode != null)
+ {
+ // TODO: shall return a deep copy instead of reference
+ record = ((T) znode.getData());
+ if (stat != null)
+ {
+ DataTree.copyStat(znode.getStat(), stat);
+ }
+ return record;
+
+ }
+ else
+ {
+ // if cache miss, fall back to zk and update cache
+ try
+ {
+ cache.lockWrite();
+ record = _baseAccessor.get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+ cache.update(serverPath, record, stat);
+ }
+ catch (ZkNoNodeException e)
+ {
+ if (AccessOption.isThrowExceptionIfNotExist(options))
+ {
+ throw e;
+ }
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+
+ return record;
+ }
+ }
+
+ // no cache
+ return _baseAccessor.get(serverPath, stat, options);
+ }
+
+ @Override
+ public Stat getStat(String path, int options)
+ {
+ String clientPath = path;
+ String serverPath = prependChroot(clientPath);
+
+ Cache<T> cache = getCache(serverPath);
+ if (cache != null)
+ {
+ Stat stat = new Stat();
+ ZNode znode = cache.get(serverPath);
+
+ if (znode != null)
+ {
+ return znode.getStat();
+
+ }
+ else
+ {
+ // if cache miss, fall back to zk and update cache
+ try
+ {
+ cache.lockWrite();
+ T data = _baseAccessor.get(serverPath, stat, options);
+ cache.update(serverPath, data, stat);
+ }
+ catch (ZkNoNodeException e)
+ {
+ return null;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+
+ return stat;
+ }
+ }
+
+ // no cache
+ return _baseAccessor.getStat(serverPath, options);
+ }
+
+ @Override
+ public boolean[] createChildren(List<String> paths, List<T> records, int options)
+ {
+ final int size = paths.size();
+ List<String> serverPaths = prependChroot(paths);
+
+ Cache<T> cache = getCache(serverPaths);
+ if (cache != null)
+ {
+ try
+ {
+ cache.lockWrite();
+ boolean[] needCreate = new boolean[size];
+ Arrays.fill(needCreate, true);
+ List<List<String>> pathsCreatedList =
+ new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+ CreateCallbackHandler[] createCbList =
+ _baseAccessor.create(serverPaths,
+ records,
+ needCreate,
+ pathsCreatedList,
+ options);
+
+ boolean[] success = new boolean[size];
+ for (int i = 0; i < size; i++)
+ {
+ CreateCallbackHandler cb = createCbList[i];
+ success[i] = (Code.get(cb.getRc()) == Code.OK);
+
+ updateCache(cache,
+ pathsCreatedList.get(i),
+ success[i],
+ serverPaths.get(i),
+ records.get(i),
+ ZNode.ZERO_STAT);
+ }
+
+ return success;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+
+ // no cache
+ return _baseAccessor.createChildren(serverPaths, records, options);
+ }
+
+ @Override
+ public boolean[] setChildren(List<String> paths, List<T> records, int options)
+ {
+ final int size = paths.size();
+ List<String> serverPaths = prependChroot(paths);
+
+ Cache<T> cache = getCache(serverPaths);
+ if (cache != null)
+ {
+ try
+ {
+ cache.lockWrite();
+ List<Stat> setStats = new ArrayList<Stat>();
+ List<List<String>> pathsCreatedList =
+ new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+ boolean[] success =
+ _baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
+
+ for (int i = 0; i < size; i++)
+ {
+ updateCache(cache,
+ pathsCreatedList.get(i),
+ success[i],
+ serverPaths.get(i),
+ records.get(i),
+ setStats.get(i));
+ }
+
+ return success;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+
+ return _baseAccessor.setChildren(serverPaths, records, options);
+ }
+
+ @Override
+ public boolean[] updateChildren(List<String> paths,
+ List<DataUpdater<T>> updaters,
+ int options)
+ {
+ final int size = paths.size();
+ List<String> serverPaths = prependChroot(paths);
+
+ Cache<T> cache = getCache(serverPaths);
+ if (cache != null)
+ {
+ try
+ {
+ cache.lockWrite();
+
+ List<Stat> setStats = new ArrayList<Stat>();
+ boolean[] success = new boolean[size];
+ List<List<String>> pathsCreatedList =
+ new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+ List<T> updateData =
+ _baseAccessor.update(serverPaths,
+ updaters,
+ pathsCreatedList,
+ setStats,
+ options);
+
+ // System.out.println("updateChild: ");
+ // for (T data : updateData)
+ // {
+ // System.out.println(data);
+ // }
+
+ for (int i = 0; i < size; i++)
+ {
+ success[i] = (updateData.get(i) != null);
+ updateCache(cache,
+ pathsCreatedList.get(i),
+ success[i],
+ serverPaths.get(i),
+ updateData.get(i),
+ setStats.get(i));
+ }
+ return success;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+
+ // no cache
+ return _baseAccessor.updateChildren(serverPaths, updaters, options);
+ }
+
+ // TODO: change to use async_exists
+ @Override
+ public boolean[] exists(List<String> paths, int options)
+ {
+ final int size = paths.size();
+ List<String> serverPaths = prependChroot(paths);
+
+ boolean exists[] = new boolean[size];
+ for (int i = 0; i < size; i++)
+ {
+ exists[i] = exists(serverPaths.get(i), options);
+ }
+ return exists;
+ }
+
+ @Override
+ public boolean[] remove(List<String> paths, int options)
+ {
+ final int size = paths.size();
+ List<String> serverPaths = prependChroot(paths);
+
+ Cache<T> cache = getCache(serverPaths);
+ if (cache != null)
+ {
+ try
+ {
+ cache.lockWrite();
+
+ boolean[] success = _baseAccessor.remove(serverPaths, options);
+
+ for (int i = 0; i < size; i++)
+ {
+ if (success[i])
+ {
+ cache.purgeRecursive(serverPaths.get(i));
+ }
+ }
+ return success;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+
+ // no cache
+ return _baseAccessor.remove(serverPaths, options);
+ }
+
+ @Override
+ public List<T> get(List<String> paths, List<Stat> stats, int options)
+ {
+ if (paths == null || paths.isEmpty())
+ {
+ return Collections.emptyList();
+ }
+
+ final int size = paths.size();
+ List<String> serverPaths = prependChroot(paths);
+
+ List<T> records = new ArrayList<T>(Collections.<T> nCopies(size, null));
+ List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat> nCopies(size, null));
+
+ boolean needRead = false;
+ boolean needReads[] = new boolean[size]; // init to false
+
+ Cache<T> cache = getCache(serverPaths);
+ if (cache != null)
+ {
+ try
+ {
+ cache.lockRead();
+ for (int i = 0; i < size; i++)
+ {
+ ZNode zNode = cache.get(serverPaths.get(i));
+ if (zNode != null)
+ {
+ // TODO: shall return a deep copy instead of reference
+ records.set(i, (T) zNode.getData());
+ readStats.set(i, zNode.getStat());
+ }
+ else
+ {
+ needRead = true;
+ needReads[i] = true;
+ }
+ }
+ }
+ finally
+ {
+ cache.unlockRead();
+ }
+
+ // cache miss, fall back to zk and update cache
+ if (needRead)
+ {
+ cache.lockWrite();
+ try
+ {
+ List<T> readRecords = _baseAccessor.get(serverPaths, readStats, needReads);
+ for (int i = 0; i < size; i++)
+ {
+ if (needReads[i])
+ {
+ records.set(i, readRecords.get(i));
+ cache.update(serverPaths.get(i), readRecords.get(i), readStats.get(i));
+ }
+ }
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+
+ if (stats != null)
+ {
+ stats.clear();
+ stats.addAll(readStats);
+ }
+
+ return records;
+ }
+
+ // no cache
+ return _baseAccessor.get(serverPaths, stats, options);
+ }
+
+ // TODO: add cache
+ @Override
+ public Stat[] getStats(List<String> paths, int options)
+ {
+ List<String> serverPaths = prependChroot(paths);
+ return _baseAccessor.getStats(serverPaths, options);
+ }
+
+ @Override
+ public List<String> getChildNames(String parentPath, int options)
+ {
+ String serverParentPath = prependChroot(parentPath);
+
+ Cache<T> cache = getCache(serverParentPath);
+ if (cache != null)
+ {
+ // System.out.println("zk-cache");
+ ZNode znode = cache.get(serverParentPath);
+
+ if (znode != null && znode.getChildSet() != Collections.<String> emptySet())
+ {
+ // System.out.println("zk-cache-hit: " + parentPath);
+ List<String> childNames = new ArrayList<String>(znode.getChildSet());
+ Collections.sort(childNames);
+ return childNames;
+ }
+ else
+ {
+ // System.out.println("zk-cache-miss");
+ try
+ {
+ cache.lockWrite();
+
+ List<String> childNames =
+ _baseAccessor.getChildNames(serverParentPath, options);
+ // System.out.println("\t--" + childNames);
+ cache.addToParentChildSet(serverParentPath, childNames);
+
+ return childNames;
+ }
+ finally
+ {
+ cache.unlockWrite();
+ }
+ }
+ }
+
+ // no cache
+ return _baseAccessor.getChildNames(serverParentPath, options);
+ }
+
+ @Override
+ public List<T> getChildren(String parentPath, List<Stat> stats, int options)
+ {
+ List<String> childNames = getChildNames(parentPath, options);
+ if (childNames == null)
+ {
+ return null;
+ }
+
+ List<String> paths = new ArrayList<String>();
+ for (String childName : childNames)
+ {
+ String path = parentPath + "/" + childName;
+ paths.add(path);
+ }
+
+ return get(paths, stats, options);
+ }
+
+ @Override
+ public void subscribeDataChanges(String path, IZkDataListener listener)
+ {
+ String serverPath = prependChroot(path);
+
+ _baseAccessor.subscribeDataChanges(serverPath, listener);
+ }
+
+ @Override
+ public void unsubscribeDataChanges(String path, IZkDataListener listener)
+ {
+ String serverPath = prependChroot(path);
+
+ _baseAccessor.unsubscribeDataChanges(serverPath, listener);
+ }
+
+ @Override
+ public List<String> subscribeChildChanges(String path, IZkChildListener listener)
+ {
+ String serverPath = prependChroot(path);
+
+ return _baseAccessor.subscribeChildChanges(serverPath, listener);
+ }
+
+ @Override
+ public void unsubscribeChildChanges(String path, IZkChildListener listener)
+ {
+ String serverPath = prependChroot(path);
+
+ _baseAccessor.unsubscribeChildChanges(serverPath, listener);
+ }
+
+ @Override
+ public void subscribe(String parentPath, HelixPropertyListener listener)
+ {
+ String serverPath = prependChroot(parentPath);
+ _zkCache.subscribe(serverPath, listener);
+ }
+
+ @Override
+ public void unsubscribe(String parentPath, HelixPropertyListener listener)
+ {
+ String serverPath = prependChroot(parentPath);
+ _zkCache.unsubscribe(serverPath, listener);
+ }
+
+ @Override
+ public void start()
+ {
+
+ LOG.info("START: Init ZkCacheBaseDataAccessor: " + _chrootPath + ", " + _wtCachePaths
+ + ", " + _zkCachePaths);
+
+ // start event thread
+ try
+ {
+ _eventLock.lockInterruptibly();
+ if (_eventThread != null)
+ {
+ LOG.warn(_eventThread + " has already started");
+ }
+ else
+ {
+
+ if (_zkCachePaths == null || _zkCachePaths.isEmpty())
+ {
+ LOG.warn("ZkCachePaths is null or empty. Will not start ZkCacheEventThread");
+ }
+ else
+ {
+ LOG.debug("Starting ZkCacheEventThread...");
+
+ _eventThread = new ZkCacheEventThread("");
+ _eventThread.start();
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ LOG.error("Current thread is interrupted when starting ZkCacheEventThread. ", e);
+ }
+ finally
+ {
+ _eventLock.unlock();
+ }
+ LOG.debug("Start ZkCacheEventThread...done");
+
+ _wtCache = new WriteThroughCache<T>(_baseAccessor, _wtCachePaths);
+ _zkCache =
+ new ZkCallbackCache<T>(_baseAccessor, _chrootPath, _zkCachePaths, _eventThread);
+
+ if (_wtCachePaths != null && !_wtCachePaths.isEmpty())
+ {
+ for (String path : _wtCachePaths)
+ {
+ _cacheMap.put(path, _wtCache);
+ }
+ }
+
+ if (_zkCachePaths != null && !_zkCachePaths.isEmpty())
+ {
+ for (String path : _zkCachePaths)
+ {
+ _cacheMap.put(path, _zkCache);
+ }
+ }
+ }
+
+ @Override
+ public void stop()
+ {
+ try
+ {
+ _eventLock.lockInterruptibly();
+
+ if (_zkclient != null)
+ {
+ _zkclient.close();
+ _zkclient = null;
+ }
+
+ if (_eventThread == null)
+ {
+ LOG.warn(_eventThread + " has already stopped");
+ return;
+ }
+
+ LOG.debug("Stopping ZkCacheEventThread...");
+ _eventThread.interrupt();
+ _eventThread.join(2000);
+ _eventThread = null;
+ }
+ catch (InterruptedException e)
+ {
+ LOG.error("Current thread is interrupted when stopping ZkCacheEventThread.");
+ }
+ finally
+ {
+ _eventLock.unlock();
+ }
+
+ LOG.debug("Stop ZkCacheEventThread...done");
+
+ }
+
+ @Override
+ public void reset()
+ {
+ if (_wtCache != null)
+ {
+ _wtCache.reset();
+ }
+
+ if (_zkCache != null)
+ {
+ _zkCache.reset();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
new file mode 100644
index 0000000..6424719
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheEventThread.java
@@ -0,0 +1,88 @@
+package org.apache.helix.manager.zk;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.log4j.Logger;
+
+// copy from ZkEventThread
+public class ZkCacheEventThread extends Thread
+{
+
+ private static final Logger LOG =
+ Logger.getLogger(ZkCacheEventThread.class);
+ private final BlockingQueue<ZkCacheEvent> _events = new LinkedBlockingQueue<ZkCacheEvent>();
+ private static AtomicInteger _eventId = new AtomicInteger(0);
+
+ static abstract class ZkCacheEvent
+ {
+
+ private final String _description;
+
+ public ZkCacheEvent(String description)
+ {
+ _description = description;
+ }
+
+ public abstract void run() throws Exception;
+
+ @Override
+ public String toString()
+ {
+ return "ZkCacheEvent[" + _description + "]";
+ }
+ }
+
+ ZkCacheEventThread(String name)
+ {
+ setDaemon(true);
+ setName("ZkCache-EventThread-" + getId() + "-" + name);
+ }
+
+ @Override
+ public void run()
+ {
+ LOG.info("Starting ZkCache event thread.");
+ try
+ {
+ while (!isInterrupted())
+ {
+ ZkCacheEvent zkEvent = _events.take();
+ int eventId = _eventId.incrementAndGet();
+ LOG.debug("Delivering event #" + eventId + " " + zkEvent);
+ try
+ {
+ zkEvent.run();
+ }
+ catch (InterruptedException e)
+ {
+ interrupt();
+ }
+ catch (ZkInterruptedException e)
+ {
+ interrupt();
+ }
+ catch (Throwable e)
+ {
+ LOG.error("Error handling event " + zkEvent, e);
+ }
+ LOG.debug("Delivering event #" + eventId + " done");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ LOG.info("Terminate ZkClient event thread.");
+ }
+ }
+
+ public void send(ZkCacheEvent event)
+ {
+ if (!isInterrupted())
+ {
+ LOG.debug("New event: " + event);
+ _events.add(event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
new file mode 100644
index 0000000..36c4fa7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
@@ -0,0 +1,348 @@
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheEventThread.ZkCacheEvent;
+import org.apache.helix.store.HelixPropertyListener;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ZkCallbackCache<T> extends Cache<T> implements
+ IZkChildListener,
+ IZkDataListener,
+ IZkStateListener
+{
+ private static Logger LOG = Logger.getLogger(ZkCallbackCache.class);
+
+ final BaseDataAccessor<T> _accessor;
+ final String _chrootPath;
+
+ private final ZkCacheEventThread _eventThread;
+ private final Map<String, Set<HelixPropertyListener>> _listener;
+
+ public ZkCallbackCache(BaseDataAccessor<T> accessor, String chrootPath,
+ List<String> paths, ZkCacheEventThread eventThread)
+ {
+ super();
+ _accessor = accessor;
+ _chrootPath = chrootPath;
+
+ _listener = new ConcurrentHashMap<String, Set<HelixPropertyListener>>();
+ _eventThread = eventThread;
+
+ // init cache
+ // System.out.println("init cache: " + paths);
+ if (paths != null && !paths.isEmpty())
+ {
+ for (String path : paths)
+ {
+ updateRecursive(path);
+ }
+ }
+ }
+
+ @Override
+ public void update(String path, T data, Stat stat)
+ {
+ String parentPath = new File(path).getParent();
+ String childName = new File(path).getName();
+
+ addToParentChildSet(parentPath, childName);
+ ZNode znode = _cache.get(path);
+ if (znode == null)
+ {
+ _cache.put(path, new ZNode(path, data, stat));
+ fireEvents(path, EventType.NodeCreated);
+ }
+ else
+ {
+ Stat oldStat = znode.getStat();
+
+ znode.setData(data);
+ znode.setStat(stat);
+ // System.out.println("\t\t--setData. path: " + path + ", data: " + data);
+
+ if (oldStat.getCzxid() != stat.getCzxid())
+ {
+ fireEvents(path, EventType.NodeDeleted);
+ fireEvents(path, EventType.NodeCreated);
+ }
+ else if (oldStat.getVersion() != stat.getVersion())
+ {
+ // System.out.println("\t--fireNodeChanged: " + path + ", oldVersion: " +
+ // oldStat.getVersion() + ", newVersion: " + stat.getVersion());
+ fireEvents(path, EventType.NodeDataChanged);
+ }
+ }
+ }
+
+ // TODO: make readData async
+ @Override
+ public void updateRecursive(String path)
+ {
+ if (path == null)
+ {
+ return;
+ }
+
+ try
+ {
+ _lock.writeLock().lock();
+ try
+ {
+ // subscribe changes before read
+ _accessor.subscribeDataChanges(path, this);
+
+ // update this node
+ Stat stat = new Stat();
+ T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+
+ update(path, readData, stat);
+ }
+ catch (ZkNoNodeException e)
+ {
+ // OK. znode not exists
+ // we still need to subscribe child change
+ }
+
+ // recursively update children nodes if not exists
+ // System.out.println("subcribeChildChange: " + path);
+ ZNode znode = _cache.get(path);
+ List<String> childNames = _accessor.subscribeChildChanges(path, this);
+ if (childNames != null && !childNames.isEmpty())
+ {
+ for (String childName : childNames)
+ {
+ if (!znode.hasChild(childName))
+ {
+ String childPath = path + "/" + childName;
+ znode.addChild(childName);
+ updateRecursive(childPath);
+ }
+ }
+ }
+ }
+ finally
+ {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+ {
+ // System.out.println("handleChildChange: " + parentPath + ", " + currentChilds);
+
+ // this is invoked if subscribed for childChange and node gets deleted
+ if (currentChilds == null)
+ {
+ return;
+ }
+
+ updateRecursive(parentPath);
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception
+ {
+ // System.out.println("handleDataChange: " + dataPath);
+ try
+ {
+ _lock.writeLock().lock();
+
+ // TODO: optimize it by get stat from callback
+ Stat stat = new Stat();
+ Object readData =
+ _accessor.get(dataPath, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+
+ ZNode znode = _cache.get(dataPath);
+ if (znode != null)
+ {
+ Stat oldStat = znode.getStat();
+
+ // System.out.println("handleDataChange: " + dataPath + ", data: " + data);
+ // System.out.println("handleDataChange: " + dataPath + ", oldCzxid: " +
+ // oldStat.getCzxid() + ", newCzxid: " + stat.getCzxid()
+ // + ", oldVersion: " + oldStat.getVersion() + ", newVersion: " +
+ // stat.getVersion());
+ znode.setData(readData);
+ znode.setStat(stat);
+
+ // if create right after delete, and zkCallback comes after create
+ // no DataDelete() will be fired, instead will fire 2 DataChange()
+ // see ZkClient.fireDataChangedEvents()
+ if (oldStat.getCzxid() != stat.getCzxid())
+ {
+ fireEvents(dataPath, EventType.NodeDeleted);
+ fireEvents(dataPath, EventType.NodeCreated);
+ }
+ else if (oldStat.getVersion() != stat.getVersion())
+ {
+ // System.out.println("\t--fireNodeChanged: " + dataPath + ", oldVersion: " +
+ // oldStat.getVersion() + ", newVersion: " + stat.getVersion());
+ fireEvents(dataPath, EventType.NodeDataChanged);
+ }
+ }
+ else
+ {
+ // we may see dataChange on child before childChange on parent
+ // in this case, let childChange update cache
+ }
+ }
+ finally
+ {
+ _lock.writeLock().unlock();
+ }
+
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception
+ {
+ // System.out.println("handleDataDeleted: " + dataPath);
+
+ try
+ {
+ _lock.writeLock().lock();
+ _accessor.unsubscribeDataChanges(dataPath, this);
+ _accessor.unsubscribeChildChanges(dataPath, this);
+
+ String parentPath = new File(dataPath).getParent();
+ String name = new File(dataPath).getName();
+ removeFromParentChildSet(parentPath, name);
+ _cache.remove(dataPath);
+
+ fireEvents(dataPath, EventType.NodeDeleted);
+ }
+ finally
+ {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void handleNewSession() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void subscribe(String path, HelixPropertyListener listener)
+ {
+ synchronized (_listener)
+ {
+ Set<HelixPropertyListener> listeners = _listener.get(path);
+ if (listeners == null)
+ {
+ listeners = new CopyOnWriteArraySet<HelixPropertyListener>();
+ _listener.put(path, listeners);
+ }
+ listeners.add(listener);
+ }
+ }
+
+ public void unsubscribe(String path, HelixPropertyListener childListener)
+ {
+ synchronized (_listener)
+ {
+ final Set<HelixPropertyListener> listeners = _listener.get(path);
+ if (listeners != null)
+ {
+ listeners.remove(childListener);
+ }
+ }
+ }
+
+ private void fireEvents(final String path, EventType type)
+ {
+ String tmpPath = path;
+ final String clientPath =
+ (_chrootPath == null ? path : (_chrootPath.equals(path) ? "/"
+ : path.substring(_chrootPath.length())));
+
+ while (tmpPath != null)
+ {
+ Set<HelixPropertyListener> listeners = _listener.get(tmpPath);
+
+ if (listeners != null && !listeners.isEmpty())
+ {
+ for (final HelixPropertyListener listener : listeners)
+ {
+ try
+ {
+ switch (type)
+ {
+ case NodeDataChanged:
+ // listener.onDataChange(path);
+ _eventThread.send(new ZkCacheEvent("dataChange on " + path + " send to "
+ + listener)
+ {
+ @Override
+ public void run() throws Exception
+ {
+ listener.onDataChange(clientPath);
+ }
+ });
+ break;
+ case NodeCreated:
+ // listener.onDataCreate(path);
+ _eventThread.send(new ZkCacheEvent("dataCreate on " + path + " send to "
+ + listener)
+ {
+ @Override
+ public void run() throws Exception
+ {
+ listener.onDataCreate(clientPath);
+ }
+ });
+ break;
+ case NodeDeleted:
+ // listener.onDataDelete(path);
+ _eventThread.send(new ZkCacheEvent("dataDelete on " + path + " send to "
+ + listener)
+ {
+ @Override
+ public void run() throws Exception
+ {
+ listener.onDataDelete(clientPath);
+ }
+ });
+ break;
+ default:
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception in handle events.", e);
+ }
+ }
+ }
+
+ tmpPath = new File(tmpPath).getParent();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
new file mode 100644
index 0000000..e1531a1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -0,0 +1,445 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.SerializableSerializer;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * ZKClient does not provide some functionalities, this will be used for quick fixes if
+ * any bug found in ZKClient or if we need additional features but can't wait for the new
+ * ZkClient jar Ideally we should commit the changes we do here to ZKClient.
+ *
+ * @author kgopalak
+ *
+ */
+
+public class ZkClient extends org.I0Itec.zkclient.ZkClient
+{
+ private static Logger LOG = Logger.getLogger(ZkClient.class);
+ public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
+ public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
+ // public static String sessionId;
+ // public static String sessionPassword;
+
+ private PathBasedZkSerializer _zkSerializer;
+
+ public ZkClient(IZkConnection connection, int connectionTimeout,
+ PathBasedZkSerializer zkSerializer)
+ {
+ super(connection, connectionTimeout, new ByteArraySerializer());
+ _zkSerializer = zkSerializer;
+
+ StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+ LOG.info("create a new zkclient. " + Arrays.asList(calls));
+ }
+
+ public ZkClient(IZkConnection connection, int connectionTimeout,
+ ZkSerializer zkSerializer)
+ {
+ this(connection, connectionTimeout, new BasicZkSerializer(zkSerializer));
+ }
+
+ public ZkClient(IZkConnection connection, int connectionTimeout)
+ {
+ this(connection, connectionTimeout, new SerializableSerializer());
+ }
+
+ public ZkClient(IZkConnection connection)
+ {
+ this(connection, Integer.MAX_VALUE, new SerializableSerializer());
+ }
+
+ public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
+ ZkSerializer zkSerializer)
+ {
+ this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
+ }
+
+ public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
+ PathBasedZkSerializer zkSerializer)
+ {
+ this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
+ }
+
+ public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)
+ {
+ this(new ZkConnection(zkServers, sessionTimeout),
+ connectionTimeout,
+ new SerializableSerializer());
+ }
+
+ public ZkClient(String zkServers, int connectionTimeout)
+ {
+ this(new ZkConnection(zkServers), connectionTimeout, new SerializableSerializer());
+ }
+
+ public ZkClient(String zkServers)
+ {
+ this(new ZkConnection(zkServers), Integer.MAX_VALUE, new SerializableSerializer());
+ }
+
+ {
+ }
+
+ @Override
+ public void setZkSerializer(ZkSerializer zkSerializer)
+ {
+ _zkSerializer = new BasicZkSerializer(zkSerializer);
+ }
+
+ public void setZkSerializer(PathBasedZkSerializer zkSerializer)
+ {
+ _zkSerializer = zkSerializer;
+ }
+
+ public IZkConnection getConnection()
+ {
+ return _connection;
+ }
+
+ @Override
+ public void close() throws ZkInterruptedException
+ {
+ StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+ LOG.info("closing a zkclient. zookeeper: "
+ + (_connection == null ? "null" : ((ZkConnection) _connection).getZookeeper())
+ + ", callStack: " + Arrays.asList(calls));
+
+ super.close();
+ }
+
+ public Stat getStat(final String path)
+ {
+ long startT = System.nanoTime();
+
+ try
+ {
+ Stat stat = retryUntilConnected(new Callable<Stat>()
+ {
+
+ @Override
+ public Stat call() throws Exception
+ {
+ Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false);
+ return stat;
+ }
+ });
+
+ return stat;
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("exists, path: " + path + ", time: " + (endT - startT) + " ns");
+ }
+ }
+
+ // override exists(path, watch), so we can record all exists requests
+ @Override
+ protected boolean exists(final String path, final boolean watch)
+ {
+ long startT = System.nanoTime();
+
+ try
+ {
+ return retryUntilConnected(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return _connection.exists(path, watch);
+ }
+ });
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("exists, path: " + path + ", time: " + (endT - startT) + " ns");
+ }
+ }
+
+ // override getChildren(path, watch), so we can record all getChildren requests
+ @Override
+ protected List<String> getChildren(final String path, final boolean watch)
+ {
+ long startT = System.nanoTime();
+
+ try
+ {
+ return retryUntilConnected(new Callable<List<String>>()
+ {
+ @Override
+ public List<String> call() throws Exception
+ {
+ return _connection.getChildren(path, watch);
+ }
+ });
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("getChildren, path: " + path + ", time: " + (endT - startT) + " ns");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends Object> T deserialize(byte[] data, String path)
+ {
+ if (data == null)
+ {
+ return null;
+ }
+ return (T) _zkSerializer.deserialize(data, path);
+ }
+
+ // override readData(path, stat, watch), so we can record all read requests
+ @Override
+ @SuppressWarnings("unchecked")
+ protected <T extends Object> T readData(final String path,
+ final Stat stat,
+ final boolean watch)
+ {
+ long startT = System.nanoTime();
+ try
+ {
+ byte[] data = retryUntilConnected(new Callable<byte[]>()
+ {
+
+ @Override
+ public byte[] call() throws Exception
+ {
+ return _connection.readData(path, stat, watch);
+ }
+ });
+ return (T) deserialize(data, path);
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("getData, path: " + path + ", time: " + (endT - startT) + " ns");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends Object> T readDataAndStat(String path,
+ Stat stat,
+ boolean returnNullIfPathNotExists)
+ {
+ T data = null;
+ try
+ {
+ data = (T) super.readData(path, stat);
+ }
+ catch (ZkNoNodeException e)
+ {
+ if (!returnNullIfPathNotExists)
+ {
+ throw e;
+ }
+ }
+ return data;
+ }
+
+ public String getServers()
+ {
+ return _connection.getServers();
+ }
+
+ public byte[] serialize(Object data, String path)
+ {
+ return _zkSerializer.serialize(data, path);
+ }
+
+ @Override
+ public void writeData(final String path, Object datat, final int expectedVersion)
+ {
+ long startT = System.nanoTime();
+ try
+ {
+ final byte[] data = serialize(datat, path);
+
+ retryUntilConnected(new Callable<Object>()
+ {
+
+ @Override
+ public Object call() throws Exception
+ {
+ _connection.writeData(path, data, expectedVersion);
+ return null;
+ }
+ });
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("setData, path: " + path + ", time: " + (endT - startT) + " ns");
+ }
+ }
+
+ public Stat writeDataGetStat(final String path, Object datat, final int expectedVersion) throws InterruptedException
+ {
+ Stat stat = null;
+ long start = System.nanoTime();
+ try
+ {
+ byte[] bytes = _zkSerializer.serialize(datat, path);
+ stat =
+ ((ZkConnection) _connection).getZookeeper().setData(path,
+ bytes,
+ expectedVersion);
+ return stat;
+ }
+ catch (KeeperException e)
+ {
+ throw ZkException.create(e);
+ }
+ finally
+ {
+ long end = System.nanoTime();
+ LOG.info("setData, path: " + path + ", time: " + (end - start) + " ns");
+ }
+ }
+
+ @Override
+ public String create(final String path, Object data, final CreateMode mode) throws ZkInterruptedException,
+ IllegalArgumentException,
+ ZkException,
+ RuntimeException
+ {
+ if (path == null)
+ {
+ throw new NullPointerException("path must not be null.");
+ }
+
+ long startT = System.nanoTime();
+ try
+ {
+ final byte[] bytes = data == null ? null : serialize(data, path);
+
+ return retryUntilConnected(new Callable<String>()
+ {
+
+ @Override
+ public String call() throws Exception
+ {
+ return _connection.create(path, bytes, mode);
+ }
+ });
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("create, path: " + path + ", time: " + (endT - startT) + " ns");
+ }
+ }
+
+ @Override
+ public boolean delete(final String path)
+ {
+ long startT = System.nanoTime();
+ try
+ {
+ try
+ {
+ retryUntilConnected(new Callable<Object>()
+ {
+
+ @Override
+ public Object call() throws Exception
+ {
+ _connection.delete(path);
+ return null;
+ }
+ });
+
+ return true;
+ }
+ catch (ZkNoNodeException e)
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ long endT = System.nanoTime();
+ LOG.info("delete, path: " + path + ", time: " + (endT - startT) + " ns");
+ }
+ }
+
+ public void asyncCreate(final String path,
+ Object datat,
+ CreateMode mode,
+ CreateCallbackHandler cb)
+ {
+ byte[] data = null;
+ if (datat != null)
+ {
+ data = serialize(datat, path);
+ }
+ ((ZkConnection) _connection).getZookeeper().create(path, data, Ids.OPEN_ACL_UNSAFE, // Arrays.asList(DEFAULT_ACL),
+ mode,
+ cb,
+ null);
+ }
+
+ public void asyncSetData(final String path,
+ Object datat,
+ int version,
+ SetDataCallbackHandler cb)
+ {
+ final byte[] data = serialize(datat, path);
+ ((ZkConnection) _connection).getZookeeper().setData(path, data, version, cb, null);
+
+ }
+
+ public void asyncGetData(final String path, GetDataCallbackHandler cb)
+ {
+ ((ZkConnection) _connection).getZookeeper().getData(path, null, cb, null);
+ }
+
+ public void asyncExists(final String path, ExistsCallbackHandler cb)
+ {
+ ((ZkConnection) _connection).getZookeeper().exists(path, null, cb, null);
+
+ }
+
+ public void asyncDelete(String path, DeleteCallbackHandler cb)
+ {
+ ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
new file mode 100644
index 0000000..d830449
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
@@ -0,0 +1,93 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class ZkStateChangeListener implements IZkStateListener
+{
+ private volatile boolean _isConnected;
+ private volatile boolean _hasSessionExpired;
+ private final ZKHelixManager _zkHelixManager;
+
+ private static Logger logger = Logger.getLogger(ZkStateChangeListener.class);
+
+ public ZkStateChangeListener(ZKHelixManager zkHelixManager)
+ {
+ this._zkHelixManager = zkHelixManager;
+
+ }
+
+ @Override
+ public void handleNewSession()
+ {
+ // TODO:bug in zkclient .
+ // zkclient does not invoke handleStateChanged when a session expires but
+ // directly invokes handleNewSession
+ _isConnected = true;
+ _hasSessionExpired = false;
+ _zkHelixManager.handleNewSession();
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState keeperState) throws Exception
+ {
+ switch (keeperState)
+ {
+ case SyncConnected:
+ ZkConnection zkConnection =
+ ((ZkConnection) _zkHelixManager._zkClient.getConnection());
+ logger.info("KeeperState: " + keeperState + ", zookeeper:" + zkConnection.getZookeeper());
+ _isConnected = true;
+ break;
+ case Disconnected:
+ logger.info("KeeperState:" + keeperState + ", disconnectedSessionId: "
+ + _zkHelixManager._sessionId + ", instance: "
+ + _zkHelixManager.getInstanceName() + ", type: "
+ + _zkHelixManager.getInstanceType());
+
+ _isConnected = false;
+ break;
+ case Expired:
+ logger.info("KeeperState:" + keeperState + ", expiredSessionId: "
+ + _zkHelixManager._sessionId + ", instance: "
+ + _zkHelixManager.getInstanceName() + ", type: "
+ + _zkHelixManager.getInstanceType());
+
+ _isConnected = false;
+ _hasSessionExpired = true;
+ break;
+ }
+ }
+
+ boolean isConnected()
+ {
+ return _isConnected;
+ }
+
+ void disconnect()
+ {
+ _isConnected = false;
+ }
+
+ boolean hasSessionExpired()
+ {
+ return _hasSessionExpired;
+ }
+}