You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[18/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkBaseDataAccessor.java
deleted file mode 100644
index fd1ec11..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkBaseDataAccessor.java
+++ /dev/null
@@ -1,1243 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
-import org.I0Itec.zkclient.exception.ZkException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.DataTree;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
-import com.linkedin.helix.store.zk.ZNode;
-
-public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
-{
- enum RetCode
- {
- OK, NODE_EXISTS, ERROR
- }
-
- private static Logger LOG = Logger.getLogger(ZkBaseDataAccessor.class);
-
- private final ZkClient _zkClient;
-
- public ZkBaseDataAccessor(ZkClient zkClient)
- {
- _zkClient = zkClient;
- }
-
- /**
- * sync create
- */
- @Override
- public boolean create(String path, T record, int options)
- {
- return create(path, record, null, options) == RetCode.OK;
- }
-
- /**
- * sync create
- */
- public RetCode create(String path, T record, List<String> pathCreated, int options)
- {
- CreateMode mode = AccessOption.getMode(options);
- if (mode == null)
- {
- LOG.error("Invalid create mode. options: " + options);
- return RetCode.ERROR;
- }
-
- boolean retry;
- do
- {
- retry = false;
- try
- {
- _zkClient.create(path, record, mode);
- if (pathCreated != null)
- pathCreated.add(path);
-
- return RetCode.OK;
- }
- catch (ZkNoNodeException e)
- {
- // this will happen if parent node does not exist
- String parentPath = new File(path).getParent();
- try
- {
- RetCode rc = create(parentPath, null, pathCreated, AccessOption.PERSISTENT);
- if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
- {
- // if parent node created/exists, retry
- retry = true;
- }
- }
- catch (Exception e1)
- {
- LOG.error("Exception while creating path: " + parentPath, e1);
- return RetCode.ERROR;
- }
- }
- catch (ZkNodeExistsException e)
- {
- LOG.warn("Node already exists. path: " + path);
- return RetCode.NODE_EXISTS;
- }
- catch (Exception e)
- {
- LOG.error("Exception while creating path: " + path, e);
- return RetCode.ERROR;
- }
- }
- while (retry);
-
- return RetCode.OK;
- }
-
- /**
- * sync set
- */
- @Override
- public boolean set(String path, T record, int options)
- {
- return set(path, record, null, null, -1, options);
- }
-
- /**
- * sync set
- *
- * @param setstat
- * : if node is created instead of set, stat will NOT be set
- */
- public boolean set(String path,
- T record,
- List<String> pathsCreated,
- Stat setstat,
- int expectVersion,
- int options)
- {
- CreateMode mode = AccessOption.getMode(options);
- if (mode == null)
- {
- LOG.error("Invalid set mode. options: " + options);
- return false;
- }
-
- boolean retry;
- do
- {
- retry = false;
- try
- {
- // _zkClient.writeData(path, record);
- Stat setStat = _zkClient.writeDataGetStat(path, record, expectVersion);
- if (setstat != null)
- DataTree.copyStat(setStat, setstat);
- }
- catch (ZkNoNodeException e)
- {
- // node not exists, try create. in this case, stat will not be set
- try
- {
- RetCode rc = create(path, record, pathsCreated, options);
- // if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
- // retry = true;
- switch (rc)
- {
- case OK:
- // not set stat if node is created (instead of set)
- break;
- case NODE_EXISTS:
- retry = true;
- break;
- default:
- LOG.error("Fail to set path by creating: " + path);
- return false;
- }
- }
- catch (Exception e1)
- {
- LOG.error("Exception while setting path by creating: " + path, e);
- return false;
- }
- }
- catch (ZkBadVersionException e)
- {
- throw e;
- }
- catch (Exception e)
- {
- LOG.error("Exception while setting path: " + path, e);
- return false;
- }
- }
- while (retry);
-
- return true;
- }
-
- /**
- * sync update
- */
- @Override
- public boolean update(String path, DataUpdater<T> updater, int options)
- {
- return update(path, updater, null, null, options) != null;
- }
-
- /**
- * sync update
- *
- * @return: updatedData on success, or null on fail
- */
- public T update(String path,
- DataUpdater<T> updater,
- List<String> createPaths,
- Stat stat,
- int options)
- {
- CreateMode mode = AccessOption.getMode(options);
- if (mode == null)
- {
- LOG.error("Invalid update mode. options: " + options);
- return null;
- }
-
- boolean retry;
- T updatedData = null;
- do
- {
- retry = false;
- try
- {
- Stat readStat = new Stat();
- T oldData = (T) _zkClient.readData(path, readStat);
- T newData = updater.update(oldData);
- Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
- if (stat != null)
- {
- DataTree.copyStat(setStat, stat);
- }
-
- updatedData = newData;
- }
- catch (ZkBadVersionException e)
- {
- retry = true;
- }
- catch (ZkNoNodeException e)
- {
- // node not exist, try create
- try
- {
- T newData = updater.update(null);
- RetCode rc = create(path, newData, createPaths, options);
- switch (rc)
- {
- case OK:
- updatedData = newData;
- break;
- case NODE_EXISTS:
- retry = true;
- break;
- default:
- LOG.error("Fail to update path by creating: " + path);
- return null;
- }
- }
- catch (Exception e1)
- {
- LOG.error("Exception while updating path by creating: " + path, e1);
- return null;
- }
- }
- catch (Exception e)
- {
- LOG.error("Exception while updating path: " + path, e);
- return null;
- }
- }
- while (retry);
-
- return updatedData;
- }
-
- /**
- * sync get
- *
- */
- @Override
- public T get(String path, Stat stat, int options)
- {
- T data = null;
- try
- {
- data = (T) _zkClient.readData(path, stat);
- }
- catch (ZkNoNodeException e)
- {
- if (AccessOption.isThrowExceptionIfNotExist(options))
- {
- throw e;
- }
- }
- return data;
- }
-
- /**
- * async get
- *
- */
- @Override
- public List<T> get(List<String> paths, List<Stat> stats, int options)
- {
- boolean[] needRead = new boolean[paths.size()];
- Arrays.fill(needRead, true);
-
- return get(paths, stats, needRead);
- }
-
- /**
- * async get
- */
- List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead)
- {
- if (paths == null || paths.size() == 0)
- {
- return Collections.emptyList();
- }
-
- // init stats
- if (stats != null)
- {
- stats.clear();
- stats.addAll(Collections.<Stat> nCopies(paths.size(), null));
- }
-
- long startT = System.nanoTime();
-
- try
- {
- // issue asyn get requests
- GetDataCallbackHandler[] cbList = new GetDataCallbackHandler[paths.size()];
- for (int i = 0; i < paths.size(); i++)
- {
- if (!needRead[i])
- continue;
-
- String path = paths.get(i);
- cbList[i] = new GetDataCallbackHandler();
- _zkClient.asyncGetData(path, cbList[i]);
- }
-
- // wait for completion
- for (int i = 0; i < cbList.length; i++)
- {
- if (!needRead[i])
- continue;
-
- GetDataCallbackHandler cb = cbList[i];
- cb.waitForSuccess();
- }
-
- // construct return results
- List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
-
- for (int i = 0; i < paths.size(); i++)
- {
- if (!needRead[i])
- continue;
-
- GetDataCallbackHandler cb = cbList[i];
- if (Code.get(cb.getRc()) == Code.OK)
- {
- @SuppressWarnings("unchecked")
- T record = (T) _zkClient.deserialize(cb._data, paths.get(i));
- records.set(i, record);
- if (stats != null)
- {
- stats.set(i, cb._stat);
- }
- }
- }
-
- return records;
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("getData_async, size: " + paths.size() + ", paths: " + paths.get(0)
- + ",... time: " + (endT - startT) + " ns");
- }
- }
-
- /**
- * asyn getChildren
- *
- */
- @Override
- public List<T> getChildren(String parentPath, List<Stat> stats, int options)
- {
- try
- {
- // prepare child paths
- List<String> childNames = getChildNames(parentPath, options);
- if (childNames == null || childNames.size() == 0)
- {
- return Collections.emptyList();
- }
-
- List<String> paths = new ArrayList<String>();
- for (String childName : childNames)
- {
- String path = parentPath + "/" + childName;
- paths.add(path);
- }
-
- // remove null record
- List<Stat> curStats = new ArrayList<Stat>(paths.size());
- List<T> records = get(paths, curStats, options);
- Iterator<T> recordIter = records.iterator();
- Iterator<Stat> statIter = curStats.iterator();
- while (statIter.hasNext())
- {
- recordIter.next();
- if (statIter.next() == null)
- {
- statIter.remove();
- recordIter.remove();
- }
- }
-
- if (stats != null)
- {
- stats.clear();
- stats.addAll(curStats);
- }
-
- return records;
- }
- catch (ZkNoNodeException e)
- {
- return Collections.emptyList();
- }
- }
-
- /**
- * sync getChildNames
- *
- * @return null if parentPath doesn't exist
- */
- @Override
- public List<String> getChildNames(String parentPath, int options)
- {
- try
- {
- List<String> childNames = _zkClient.getChildren(parentPath);
- Collections.sort(childNames);
- return childNames;
- }
- catch (ZkNoNodeException e)
- {
- return null;
- }
- }
-
- /**
- * sync exists
- *
- */
- @Override
- public boolean exists(String path, int options)
- {
- return _zkClient.exists(path);
- }
-
- /**
- * sync getStat
- *
- */
- @Override
- public Stat getStat(String path, int options)
- {
- return _zkClient.getStat(path);
- }
-
- /**
- * sync remove
- *
- */
- @Override
- public boolean remove(String path, int options)
- {
- try
- {
- // optimize on common path
- _zkClient.delete(path);
- }
- catch (ZkException e)
- {
- _zkClient.deleteRecursive(path);
- }
- return true;
- }
-
- /**
- * async create. give up on error other than NONODE
- *
- */
- CreateCallbackHandler[] create(List<String> paths,
- List<T> records,
- boolean[] needCreate,
- List<List<String>> pathsCreated,
- int options)
- {
- if ((records != null && records.size() != paths.size())
- || needCreate.length != paths.size()
- || (pathsCreated != null && pathsCreated.size() != paths.size()))
- {
- throw new IllegalArgumentException("paths, records, needCreate, and pathsCreated should be of same size");
- }
-
- CreateCallbackHandler[] cbList = new CreateCallbackHandler[paths.size()];
-
- CreateMode mode = AccessOption.getMode(options);
- if (mode == null)
- {
- LOG.error("Invalid async set mode. options: " + options);
- return cbList;
- }
-
- boolean retry;
- do
- {
- retry = false;
-
- for (int i = 0; i < paths.size(); i++)
- {
- if (!needCreate[i])
- continue;
-
- String path = paths.get(i);
- T record = records == null ? null : records.get(i);
- cbList[i] = new CreateCallbackHandler();
- _zkClient.asyncCreate(path, record, mode, cbList[i]);
- }
-
- List<String> parentPaths =
- new ArrayList<String>(Collections.<String> nCopies(paths.size(), null));
- boolean failOnNoNode = false;
-
- for (int i = 0; i < paths.size(); i++)
- {
- if (!needCreate[i])
- continue;
-
- CreateCallbackHandler cb = cbList[i];
- cb.waitForSuccess();
- String path = paths.get(i);
-
- if (Code.get(cb.getRc()) == Code.NONODE)
- {
- String parentPath = new File(path).getParent();
- parentPaths.set(i, parentPath);
- failOnNoNode = true;
- }
- else
- {
- // if create succeed or fail on error other than NONODE,
- // give up
- needCreate[i] = false;
-
- // if succeeds, record what paths we've created
- if (Code.get(cb.getRc()) == Code.OK && pathsCreated != null)
- {
- if (pathsCreated.get(i) == null)
- {
- pathsCreated.set(i, new ArrayList<String>());
- }
- pathsCreated.get(i).add(path);
- }
- }
- }
-
- if (failOnNoNode)
- {
- boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
-
- CreateCallbackHandler[] parentCbList =
- create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
- for (int i = 0; i < parentCbList.length; i++)
- {
- CreateCallbackHandler parentCb = parentCbList[i];
- if (parentCb == null)
- continue;
-
- Code rc = Code.get(parentCb.getRc());
-
- // if parent is created, retry create child
- if (rc == Code.OK || rc == Code.NODEEXISTS)
- {
- retry = true;
- break;
- }
- }
- }
- }
- while (retry);
-
- return cbList;
- }
-
-
- /**
- * async create
- *
- * TODO: rename to create
- */
- @Override
- public boolean[] createChildren(List<String> paths, List<T> records, int options)
- {
- boolean[] success = new boolean[paths.size()];
-
- CreateMode mode = AccessOption.getMode(options);
- if (mode == null)
- {
- LOG.error("Invalid async create mode. options: " + options);
- return success;
- }
-
- boolean[] needCreate = new boolean[paths.size()];
- Arrays.fill(needCreate, true);
- List<List<String>> pathsCreated =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
-
- long startT = System.nanoTime();
- try
- {
-
- CreateCallbackHandler[] cbList =
- create(paths, records, needCreate, pathsCreated, options);
-
- for (int i = 0; i < cbList.length; i++)
- {
- CreateCallbackHandler cb = cbList[i];
- success[i] = (Code.get(cb.getRc()) == Code.OK);
- }
-
- return success;
-
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("create_async, size: " + paths.size() + ", paths: " + paths.get(0)
- + ",... time: " + (endT - startT) + " ns");
- }
- }
-
- /**
- * async set
- *
- * TODO: rename to set
- *
- */
- @Override
- public boolean[] setChildren(List<String> paths, List<T> records, int options)
- {
- return set(paths, records, null, null, options);
- }
-
- /**
- * async set, give up on error other than NoNode
- *
- */
- boolean[] set(List<String> paths,
- List<T> records,
- List<List<String>> pathsCreated,
- List<Stat> stats,
- int options)
- {
- if (paths == null || paths.size() == 0)
- {
- return new boolean[0];
- }
-
- if ((records != null && records.size() != paths.size())
- || (pathsCreated != null && pathsCreated.size() != paths.size()))
- {
- throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size");
- }
-
- boolean[] success = new boolean[paths.size()];
-
- CreateMode mode = AccessOption.getMode(options);
- if (mode == null)
- {
- LOG.error("Invalid async set mode. options: " + options);
- return success;
- }
-
- List<Stat> setStats =
- new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
- SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
- CreateCallbackHandler[] createCbList = null;
- boolean[] needSet = new boolean[paths.size()];
- Arrays.fill(needSet, true);
-
- long startT = System.nanoTime();
-
- try
- {
- boolean retry;
- do
- {
- retry = false;
-
- for (int i = 0; i < paths.size(); i++)
- {
- if (!needSet[i])
- continue;
-
- String path = paths.get(i);
- T record = records.get(i);
- cbList[i] = new SetDataCallbackHandler();
- _zkClient.asyncSetData(path, record, -1, cbList[i]);
-
- }
-
- boolean failOnNoNode = false;
-
- for (int i = 0; i < cbList.length; i++)
- {
- SetDataCallbackHandler cb = cbList[i];
- cb.waitForSuccess();
- Code rc = Code.get(cb.getRc());
- switch (rc)
- {
- case OK:
- setStats.set(i, cb.getStat());
- needSet[i] = false;
- break;
- case NONODE:
- // if fail on NoNode, try create the node
- failOnNoNode = true;
- break;
- default:
- // if fail on error other than NoNode, give up
- needSet[i] = false;
- break;
- }
- }
-
- // if failOnNoNode, try create
- if (failOnNoNode)
- {
- boolean[] needCreate = Arrays.copyOf(needSet, needSet.length);
- createCbList = create(paths, records, needCreate, pathsCreated, options);
- for (int i = 0; i < createCbList.length; i++)
- {
- CreateCallbackHandler createCb = createCbList[i];
- if (createCb == null)
- {
- continue;
- }
-
- Code rc = Code.get(createCb.getRc());
- switch (rc)
- {
- case OK:
- setStats.set(i, ZNode.ZERO_STAT);
- needSet[i] = false;
- break;
- case NODEEXISTS:
- retry = true;
- break;
- default:
- // if creation fails on error other than NodeExists
- // no need to retry set
- needSet[i] = false;
- break;
- }
- }
- }
- }
- while (retry);
-
- // construct return results
- for (int i = 0; i < cbList.length; i++)
- {
- SetDataCallbackHandler cb = cbList[i];
-
- Code rc = Code.get(cb.getRc());
- if (rc == Code.OK)
- {
- success[i] = true;
- }
- else if (rc == Code.NONODE)
- {
- CreateCallbackHandler createCb = createCbList[i];
- if (Code.get(createCb.getRc()) == Code.OK)
- {
- success[i] = true;
- }
- }
- }
-
- if (stats != null)
- {
- stats.clear();
- stats.addAll(setStats);
- }
-
- return success;
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
- + ",... time: " + (endT - startT) + " ns");
- }
- }
-
- // TODO: rename to update
- /**
- * async update
- */
- @Override
- public boolean[] updateChildren(List<String> paths,
- List<DataUpdater<T>> updaters,
- int options)
- {
-
- List<T> updateData = update(paths, updaters, null, null, options);
- boolean[] success = new boolean[paths.size()]; // init to false
- for (int i = 0; i < paths.size(); i++)
- {
- T data = updateData.get(i);
- success[i] = (data != null);
- }
- return success;
- }
-
- /**
- * async update
- *
- * return: updatedData on success or null on fail
- */
- List<T> update(List<String> paths,
- List<DataUpdater<T>> updaters,
- List<List<String>> pathsCreated,
- List<Stat> stats,
- int options)
- {
- if (paths == null || paths.size() == 0)
- {
- LOG.error("paths is null or empty");
- return Collections.emptyList();
- }
-
- if (updaters.size() != paths.size()
- || (pathsCreated != null && pathsCreated.size() != paths.size()))
- {
- throw new IllegalArgumentException("paths, updaters, and pathsCreated should be of same size");
- }
-
- List<Stat> setStats =
- new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
- List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
-
- CreateMode mode = AccessOption.getMode(options);
- if (mode == null)
- {
- LOG.error("Invalid update mode. options: " + options);
- return updateData;
- }
-
- SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
- CreateCallbackHandler[] createCbList = null;
- boolean[] needUpdate = new boolean[paths.size()];
- Arrays.fill(needUpdate, true);
-
- long startT = System.nanoTime();
-
- try
- {
- boolean retry;
- do
- {
- retry = false;
- boolean[] needCreate = new boolean[paths.size()]; // init'ed with false
- boolean failOnNoNode = false;
-
- // asycn read all data
- List<Stat> curStats = new ArrayList<Stat>();
- List<T> curDataList =
- get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length));
-
- // async update
- List<T> newDataList = new ArrayList<T>();
- for (int i = 0; i < paths.size(); i++)
- {
- if (!needUpdate[i])
- {
- newDataList.add(null);
- continue;
- }
- String path = paths.get(i);
- DataUpdater<T> updater = updaters.get(i);
- T newData = updater.update(curDataList.get(i));
- newDataList.add(newData);
- Stat curStat = curStats.get(i);
- if (curStat == null)
- {
- // node not exists
- failOnNoNode = true;
- needCreate[i] = true;
- }
- else
- {
- cbList[i] = new SetDataCallbackHandler();
- _zkClient.asyncSetData(path, newData, curStat.getVersion(), cbList[i]);
- }
- }
-
- // wait for completion
- boolean failOnBadVersion = false;
-
- for (int i = 0; i < paths.size(); i++)
- {
- SetDataCallbackHandler cb = cbList[i];
- if (cb == null)
- continue;
-
- cb.waitForSuccess();
-
- switch (Code.get(cb.getRc()))
- {
- case OK:
- updateData.set(i, newDataList.get(i));
- setStats.set(i, cb.getStat());
- needUpdate[i] = false;
- break;
- case NONODE:
- failOnNoNode = true;
- needCreate[i] = true;
- break;
- case BADVERSION:
- failOnBadVersion = true;
- break;
- default:
- // if fail on error other than NoNode or BadVersion
- // will not retry
- needUpdate[i] = false;
- break;
- }
- }
-
- // if failOnNoNode, try create
- if (failOnNoNode)
- {
- createCbList = create(paths, newDataList, needCreate, pathsCreated, options);
- for (int i = 0; i < paths.size(); i++)
- {
- CreateCallbackHandler createCb = createCbList[i];
- if (createCb == null)
- {
- continue;
- }
-
- switch (Code.get(createCb.getRc()))
- {
- case OK:
- needUpdate[i] = false;
- updateData.set(i, newDataList.get(i));
- setStats.set(i, ZNode.ZERO_STAT);
- break;
- case NODEEXISTS:
- retry = true;
- break;
- default:
- // if fail on error other than NodeExists
- // will not retry
- needUpdate[i] = false;
- break;
- }
- }
- }
-
- // if failOnBadVersion, retry
- if (failOnBadVersion)
- {
- retry = true;
- }
- }
- while (retry);
-
- if (stats != null)
- {
- stats.clear();
- stats.addAll(setStats);
- }
-
- return updateData;
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
- + ",... time: " + (endT - startT) + " ns");
- }
-
- }
-
- /**
- * async exists
- *
- */
- @Override
- public boolean[] exists(List<String> paths, int options)
- {
- Stat[] stats = getStats(paths, options);
-
- boolean[] exists = new boolean[paths.size()];
- for (int i = 0; i < paths.size(); i++)
- {
- exists[i] = (stats[i] != null);
- }
-
- return exists;
- }
-
- /**
- * async getStat
- *
- */
- @Override
- public Stat[] getStats(List<String> paths, int options)
- {
- if (paths == null || paths.size() == 0)
- {
- LOG.error("paths is null or empty");
- return new Stat[0];
- }
-
- Stat[] stats = new Stat[paths.size()];
-
- long startT = System.nanoTime();
-
- try
- {
- ExistsCallbackHandler[] cbList = new ExistsCallbackHandler[paths.size()];
- for (int i = 0; i < paths.size(); i++)
- {
- String path = paths.get(i);
- cbList[i] = new ExistsCallbackHandler();
- _zkClient.asyncExists(path, cbList[i]);
- }
-
- for (int i = 0; i < cbList.length; i++)
- {
- ExistsCallbackHandler cb = cbList[i];
- cb.waitForSuccess();
- stats[i] = cb._stat;
- }
-
- return stats;
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("exists_async, size: " + paths.size() + ", paths: " + paths.get(0)
- + ",... time: " + (endT - startT) + " ns");
- }
- }
-
- /**
- * async remove
- *
- */
- @Override
- public boolean[] remove(List<String> paths, int options)
- {
- if (paths == null || paths.size() == 0)
- {
- return new boolean[0];
- }
-
- boolean[] success = new boolean[paths.size()];
-
- DeleteCallbackHandler[] cbList = new DeleteCallbackHandler[paths.size()];
-
- long startT = System.nanoTime();
-
- try
- {
- for (int i = 0; i < paths.size(); i++)
- {
- String path = paths.get(i);
- cbList[i] = new DeleteCallbackHandler();
- _zkClient.asyncDelete(path, cbList[i]);
- }
-
- for (int i = 0; i < cbList.length; i++)
- {
- DeleteCallbackHandler cb = cbList[i];
- cb.waitForSuccess();
- success[i] = (cb.getRc() == 0);
- }
-
- return success;
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("delete_async, size: " + paths.size() + ", paths: " + paths.get(0)
- + ",... time: " + (endT - startT) + " ns");
- }
- }
-
- /**
- * Subscribe to zookeeper data changes
- */
- @Override
- public void subscribeDataChanges(String path, IZkDataListener listener)
- {
- _zkClient.subscribeDataChanges(path, listener);
- }
-
- /**
- * Unsubscribe to zookeeper data changes
- */
- @Override
- public void unsubscribeDataChanges(String path, IZkDataListener dataListener)
- {
- _zkClient.unsubscribeDataChanges(path, dataListener);
- }
-
- /**
- * Subscrie to zookeeper data changes
- */
- @Override
- public List<String> subscribeChildChanges(String path, IZkChildListener listener)
- {
- return _zkClient.subscribeChildChanges(path, listener);
- }
-
- /**
- * Unsubscrie to zookeeper data changes
- */
- @Override
- public void unsubscribeChildChanges(String path, IZkChildListener childListener)
- {
- _zkClient.unsubscribeChildChanges(path, childListener);
- }
-
- // simple test
- public static void main(String[] args)
- {
- ZkClient zkclient = new ZkClient("localhost:2191");
- zkclient.setZkSerializer(new ZNRecordSerializer());
- ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
-
- // test async create
- List<String> createPaths =
- Arrays.asList("/test/child1/child1", "/test/child2/child2");
- List<ZNRecord> createRecords =
- Arrays.asList(new ZNRecord("child1"), new ZNRecord("child2"));
-
- boolean[] needCreate = new boolean[createPaths.size()];
- Arrays.fill(needCreate, true);
- List<List<String>> pathsCreated =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(createPaths.size(),
- null));
- accessor.create(createPaths,
- createRecords,
- needCreate,
- pathsCreated,
- AccessOption.PERSISTENT);
- System.out.println("pathsCreated: " + pathsCreated);
-
- // test async set
- List<String> setPaths =
- Arrays.asList("/test/setChild1/setChild1", "/test/setChild2/setChild2");
- List<ZNRecord> setRecords =
- Arrays.asList(new ZNRecord("setChild1"), new ZNRecord("setChild2"));
-
- pathsCreated =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(setPaths.size(),
- null));
- boolean[] success =
- accessor.set(setPaths, setRecords, pathsCreated, null, AccessOption.PERSISTENT);
- System.out.println("pathsCreated: " + pathsCreated);
- System.out.println("setSuccess: " + Arrays.toString(success));
-
- // test async update
- List<String> updatePaths =
- Arrays.asList("/test/updateChild1/updateChild1", "/test/setChild2/setChild2");
- class TestUpdater implements DataUpdater<ZNRecord>
- {
- final ZNRecord _newData;
-
- public TestUpdater(ZNRecord newData)
- {
- _newData = newData;
- }
-
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- return _newData;
-
- }
- }
- List<DataUpdater<ZNRecord>> updaters =
- Arrays.asList((DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild1")),
- (DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild2")));
-
- pathsCreated =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(updatePaths.size(),
- null));
-
- List<ZNRecord> updateRecords =
- accessor.update(updatePaths, updaters, pathsCreated, null, AccessOption.PERSISTENT);
- for (int i = 0; i < updatePaths.size(); i++)
- {
- success[i] = updateRecords.get(i) != null;
- }
- System.out.println("pathsCreated: " + pathsCreated);
- System.out.println("updateSuccess: " + Arrays.toString(success));
-
- System.out.println("CLOSING");
- zkclient.close();
- }
-
- /**
- * Reset
- */
- @Override
- public void reset()
- {
- // Nothing to do
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheBaseDataAccessor.java
deleted file mode 100644
index 41f8bab..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ /dev/null
@@ -1,984 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.DataTree;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor.RetCode;
-import com.linkedin.helix.store.HelixPropertyListener;
-import com.linkedin.helix.store.HelixPropertyStore;
-import com.linkedin.helix.store.zk.ZNode;
-
-public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
-{
- private static final Logger LOG =
- Logger.getLogger(ZkCacheBaseDataAccessor.class);
-
- protected WriteThroughCache<T> _wtCache;
- protected ZkCallbackCache<T> _zkCache;
-
- final ZkBaseDataAccessor<T> _baseAccessor;
- final Map<String, Cache<T>> _cacheMap;
-
- final String _chrootPath;
- final List<String> _wtCachePaths;
- final List<String> _zkCachePaths;
-
- final HelixGroupCommit<T> _groupCommit = new HelixGroupCommit<T>();
-
- // fire listeners
- private final ReentrantLock _eventLock = new ReentrantLock();
- private ZkCacheEventThread _eventThread;
-
- private ZkClient _zkclient = null;
-
- public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
- List<String> wtCachePaths)
- {
- this(baseAccessor, null, wtCachePaths, null);
- }
-
- public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
- String chrootPath,
- List<String> wtCachePaths,
- List<String> zkCachePaths)
- {
- _baseAccessor = baseAccessor;
-
- if (chrootPath == null || chrootPath.equals("/"))
- {
- _chrootPath = null;
- }
- else
- {
- PathUtils.validatePath(chrootPath);
- _chrootPath = chrootPath;
- }
-
- _wtCachePaths = wtCachePaths;
- _zkCachePaths = zkCachePaths;
-
- // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
- // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
- // comes first
- _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
- {
- @Override
- public int compare(String o1, String o2)
- {
- int len1 = o1.split("/").length;
- int len2 = o2.split("/").length;
- return len1 - len2;
- }
- });
-
- start();
- }
-
- public ZkCacheBaseDataAccessor(String zkAddress,
- ZkSerializer serializer,
- String chrootPath,
- List<String> wtCachePaths,
- List<String> zkCachePaths)
- {
- _zkclient =
- new ZkClient(zkAddress,
- ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT,
- serializer);
- _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT,
- TimeUnit.MILLISECONDS);
- _baseAccessor = new ZkBaseDataAccessor<T>(_zkclient);
-
- if (chrootPath == null || chrootPath.equals("/"))
- {
- _chrootPath = null;
- }
- else
- {
- PathUtils.validatePath(chrootPath);
- _chrootPath = chrootPath;
- }
-
- _wtCachePaths = wtCachePaths;
- _zkCachePaths = zkCachePaths;
-
- // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
- // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
- // comes first
- _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
- {
- @Override
- public int compare(String o1, String o2)
- {
- int len1 = o1.split("/").length;
- int len2 = o2.split("/").length;
- return len1 - len2;
- }
- });
-
- start();
- }
-
- private String prependChroot(String clientPath)
- {
- if (_chrootPath != null)
- {
- // handle clientPath = "/"
- if (clientPath.length() == 1)
- {
- return _chrootPath;
- }
- return _chrootPath + clientPath;
- }
- else
- {
- return clientPath;
- }
- }
-
- private List<String> prependChroot(List<String> clientPaths)
- {
- List<String> serverPaths = new ArrayList<String>();
- for (String clientPath : clientPaths)
- {
- serverPaths.add(prependChroot(clientPath));
- }
- return serverPaths;
- }
-
- /**
- * find the first path in paths that is a descendant
- */
- private String firstCachePath(List<String> paths)
- {
- for (String cachePath : _cacheMap.keySet())
- {
- for (String path : paths)
- {
- if (path.startsWith(cachePath))
- {
- return path;
- }
- }
- }
- return null;
- }
-
- private Cache<T> getCache(String path)
- {
- for (String cachePath : _cacheMap.keySet())
- {
- if (path.startsWith(cachePath))
- {
- return _cacheMap.get(cachePath);
- }
- }
-
- return null;
- }
-
- private Cache<T> getCache(List<String> paths)
- {
- Cache<T> cache = null;
- for (String path : paths)
- {
- for (String cachePath : _cacheMap.keySet())
- {
- if (cache == null && path.startsWith(cachePath))
- {
- cache = _cacheMap.get(cachePath);
- }
- else if (cache != null && cache != _cacheMap.get(cachePath))
- {
- throw new IllegalArgumentException("Couldn't do cross-cache async operations. paths: "
- + paths);
- }
- }
- }
-
- return cache;
- }
-
- private void updateCache(Cache<T> cache,
- List<String> createPaths,
- boolean success,
- String updatePath,
- T data,
- Stat stat)
- {
- if (createPaths == null || createPaths.isEmpty())
- {
- if (success)
- {
- cache.update(updatePath, data, stat);
- }
- }
- else
- {
- String firstPath = firstCachePath(createPaths);
- if (firstPath != null)
- {
- cache.updateRecursive(firstPath);
- }
- }
- }
-
- @Override
- public boolean create(String path, T data, int options)
- {
- String clientPath = path;
- String serverPath = prependChroot(clientPath);
-
- Cache<T> cache = getCache(serverPath);
- if (cache != null)
- {
- try
- {
- cache.lockWrite();
- List<String> pathsCreated = new ArrayList<String>();
- RetCode rc = _baseAccessor.create(serverPath, data, pathsCreated, options);
- boolean success = (rc == RetCode.OK);
-
- updateCache(cache, pathsCreated, success, serverPath, data, ZNode.ZERO_STAT);
-
- return success;
- }
- finally
- {
- cache.unlockWrite();
- }
- }
-
- // no cache
- return _baseAccessor.create(serverPath, data, options);
- }
-
- @Override
- public boolean set(String path, T data, int options)
- {
- String clientPath = path;
- String serverPath = prependChroot(clientPath);
-
- Cache<T> cache = getCache(serverPath);
- if (cache != null)
- {
- try
- {
- cache.lockWrite();
- Stat setStat = new Stat();
- List<String> pathsCreated = new ArrayList<String>();
- boolean success =
- _baseAccessor.set(serverPath, data, pathsCreated, setStat, -1, options);
-
- updateCache(cache, pathsCreated, success, serverPath, data, setStat);
-
- return success;
- }
- finally
- {
- cache.unlockWrite();
- }
- }
-
- // no cache
- return _baseAccessor.set(serverPath, data, options);
- }
-
- @Override
- public boolean update(String path, DataUpdater<T> updater, int options)
- {
- String clientPath = path;
- String serverPath = prependChroot(clientPath);
-
- Cache<T> cache = getCache(serverPath);
-
- if (cache != null)
- {
- try
- {
- cache.lockWrite();
- Stat setStat = new Stat();
- List<String> pathsCreated = new ArrayList<String>();
- T updateData =
- _baseAccessor.update(serverPath, updater, pathsCreated, setStat, options);
- boolean success = (updateData != null);
- updateCache(cache, pathsCreated, success, serverPath, updateData, setStat);
-
- return success;
- }
- finally
- {
- cache.unlockWrite();
- }
- }
-
- // no cache
- return _groupCommit.commit(_baseAccessor, options, serverPath, updater);
- // return _baseAccessor.update(serverPath, updater, options);
- }
-
- @Override
- public boolean exists(String path, int options)
- {
- String clientPath = path;
- String serverPath = prependChroot(clientPath);
-
- Cache<T> cache = getCache(serverPath);
- if (cache != null)
- {
- boolean exists = cache.exists(serverPath);
- if (exists)
- {
- return true;
- }
- }
-
- // if not exists in cache, always fall back to zk
- return _baseAccessor.exists(serverPath, options);
- }
-
- @Override
- public boolean remove(String path, int options)
- {
- String clientPath = path;
- String serverPath = prependChroot(clientPath);
-
- Cache<T> cache = getCache(serverPath);
- if (cache != null)
- {
- try
- {
- cache.lockWrite();
-
- boolean success = _baseAccessor.remove(serverPath, options);
- if (success)
- {
- cache.purgeRecursive(serverPath);
- }
-
- return success;
- }
- finally
- {
- cache.unlockWrite();
- }
- }
-
- // no cache
- return _baseAccessor.remove(serverPath, options);
- }
-
- @Override
- public T get(String path, Stat stat, int options)
- {
- String clientPath = path;
- String serverPath = prependChroot(clientPath);
-
- Cache<T> cache = getCache(serverPath);
- if (cache != null)
- {
- T record = null;
- ZNode znode = cache.get(serverPath);
-
- if (znode != null)
- {
- // TODO: shall return a deep copy instead of reference
- record = ((T) znode.getData());
- if (stat != null)
- {
- DataTree.copyStat(znode.getStat(), stat);
- }
- return record;
-
- }
- else
- {
- // if cache miss, fall back to zk and update cache
- try
- {
- cache.lockWrite();
- record = _baseAccessor.get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
- cache.update(serverPath, record, stat);
- }
- catch (ZkNoNodeException e)
- {
- if (AccessOption.isThrowExceptionIfNotExist(options))
- {
- throw e;
- }
- }
- finally
- {
- cache.unlockWrite();
- }
-
- return record;
- }
- }
-
- // no cache
- return _baseAccessor.get(serverPath, stat, options);
- }
-
- @Override
- public Stat getStat(String path, int options)
- {
- String clientPath = path;
- String serverPath = prependChroot(clientPath);
-
- Cache<T> cache = getCache(serverPath);
- if (cache != null)
- {
- Stat stat = new Stat();
- ZNode znode = cache.get(serverPath);
-
- if (znode != null)
- {
- return znode.getStat();
-
- }
- else
- {
- // if cache miss, fall back to zk and update cache
- try
- {
- cache.lockWrite();
- T data = _baseAccessor.get(serverPath, stat, options);
- cache.update(serverPath, data, stat);
- }
- catch (ZkNoNodeException e)
- {
- return null;
- }
- finally
- {
- cache.unlockWrite();
- }
-
- return stat;
- }
- }
-
- // no cache
- return _baseAccessor.getStat(serverPath, options);
- }
-
- @Override
- public boolean[] createChildren(List<String> paths, List<T> records, int options)
- {
- final int size = paths.size();
- List<String> serverPaths = prependChroot(paths);
-
- Cache<T> cache = getCache(serverPaths);
- if (cache != null)
- {
- try
- {
- cache.lockWrite();
- boolean[] needCreate = new boolean[size];
- Arrays.fill(needCreate, true);
- List<List<String>> pathsCreatedList =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
- CreateCallbackHandler[] createCbList =
- _baseAccessor.create(serverPaths,
- records,
- needCreate,
- pathsCreatedList,
- options);
-
- boolean[] success = new boolean[size];
- for (int i = 0; i < size; i++)
- {
- CreateCallbackHandler cb = createCbList[i];
- success[i] = (Code.get(cb.getRc()) == Code.OK);
-
- updateCache(cache,
- pathsCreatedList.get(i),
- success[i],
- serverPaths.get(i),
- records.get(i),
- ZNode.ZERO_STAT);
- }
-
- return success;
- }
- finally
- {
- cache.unlockWrite();
- }
- }
-
- // no cache
- return _baseAccessor.createChildren(serverPaths, records, options);
- }
-
- @Override
- public boolean[] setChildren(List<String> paths, List<T> records, int options)
- {
- final int size = paths.size();
- List<String> serverPaths = prependChroot(paths);
-
- Cache<T> cache = getCache(serverPaths);
- if (cache != null)
- {
- try
- {
- cache.lockWrite();
- List<Stat> setStats = new ArrayList<Stat>();
- List<List<String>> pathsCreatedList =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
- boolean[] success =
- _baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
-
- for (int i = 0; i < size; i++)
- {
- updateCache(cache,
- pathsCreatedList.get(i),
- success[i],
- serverPaths.get(i),
- records.get(i),
- setStats.get(i));
- }
-
- return success;
- }
- finally
- {
- cache.unlockWrite();
- }
- }
-
- return _baseAccessor.setChildren(serverPaths, records, options);
- }
-
- @Override
- public boolean[] updateChildren(List<String> paths,
- List<DataUpdater<T>> updaters,
- int options)
- {
- final int size = paths.size();
- List<String> serverPaths = prependChroot(paths);
-
- Cache<T> cache = getCache(serverPaths);
- if (cache != null)
- {
- try
- {
- cache.lockWrite();
-
- List<Stat> setStats = new ArrayList<Stat>();
- boolean[] success = new boolean[size];
- List<List<String>> pathsCreatedList =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
- List<T> updateData =
- _baseAccessor.update(serverPaths,
- updaters,
- pathsCreatedList,
- setStats,
- options);
-
- // System.out.println("updateChild: ");
- // for (T data : updateData)
- // {
- // System.out.println(data);
- // }
-
- for (int i = 0; i < size; i++)
- {
- success[i] = (updateData.get(i) != null);
- updateCache(cache,
- pathsCreatedList.get(i),
- success[i],
- serverPaths.get(i),
- updateData.get(i),
- setStats.get(i));
- }
- return success;
- }
- finally
- {
- cache.unlockWrite();
- }
- }
-
- // no cache
- return _baseAccessor.updateChildren(serverPaths, updaters, options);
- }
-
- // TODO: change to use async_exists
- @Override
- public boolean[] exists(List<String> paths, int options)
- {
- final int size = paths.size();
- List<String> serverPaths = prependChroot(paths);
-
- boolean exists[] = new boolean[size];
- for (int i = 0; i < size; i++)
- {
- exists[i] = exists(serverPaths.get(i), options);
- }
- return exists;
- }
-
- @Override
- public boolean[] remove(List<String> paths, int options)
- {
- final int size = paths.size();
- List<String> serverPaths = prependChroot(paths);
-
- Cache<T> cache = getCache(serverPaths);
- if (cache != null)
- {
- try
- {
- cache.lockWrite();
-
- boolean[] success = _baseAccessor.remove(serverPaths, options);
-
- for (int i = 0; i < size; i++)
- {
- if (success[i])
- {
- cache.purgeRecursive(serverPaths.get(i));
- }
- }
- return success;
- }
- finally
- {
- cache.unlockWrite();
- }
- }
-
- // no cache
- return _baseAccessor.remove(serverPaths, options);
- }
-
- @Override
- public List<T> get(List<String> paths, List<Stat> stats, int options)
- {
- if (paths == null || paths.isEmpty())
- {
- return Collections.emptyList();
- }
-
- final int size = paths.size();
- List<String> serverPaths = prependChroot(paths);
-
- List<T> records = new ArrayList<T>(Collections.<T> nCopies(size, null));
- List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat> nCopies(size, null));
-
- boolean needRead = false;
- boolean needReads[] = new boolean[size]; // init to false
-
- Cache<T> cache = getCache(serverPaths);
- if (cache != null)
- {
- try
- {
- cache.lockRead();
- for (int i = 0; i < size; i++)
- {
- ZNode zNode = cache.get(serverPaths.get(i));
- if (zNode != null)
- {
- // TODO: shall return a deep copy instead of reference
- records.set(i, (T) zNode.getData());
- readStats.set(i, zNode.getStat());
- }
- else
- {
- needRead = true;
- needReads[i] = true;
- }
- }
- }
- finally
- {
- cache.unlockRead();
- }
-
- // cache miss, fall back to zk and update cache
- if (needRead)
- {
- cache.lockWrite();
- try
- {
- List<T> readRecords = _baseAccessor.get(serverPaths, readStats, needReads);
- for (int i = 0; i < size; i++)
- {
- if (needReads[i])
- {
- records.set(i, readRecords.get(i));
- cache.update(serverPaths.get(i), readRecords.get(i), readStats.get(i));
- }
- }
- }
- finally
- {
- cache.unlockWrite();
- }
- }
-
- if (stats != null)
- {
- stats.clear();
- stats.addAll(readStats);
- }
-
- return records;
- }
-
- // no cache
- return _baseAccessor.get(serverPaths, stats, options);
- }
-
- // TODO: add cache
- @Override
- public Stat[] getStats(List<String> paths, int options)
- {
- List<String> serverPaths = prependChroot(paths);
- return _baseAccessor.getStats(serverPaths, options);
- }
-
- @Override
- public List<String> getChildNames(String parentPath, int options)
- {
- String serverParentPath = prependChroot(parentPath);
-
- Cache<T> cache = getCache(serverParentPath);
- if (cache != null)
- {
- // System.out.println("zk-cache");
- ZNode znode = cache.get(serverParentPath);
-
- if (znode != null && znode.getChildSet() != Collections.<String> emptySet())
- {
- // System.out.println("zk-cache-hit: " + parentPath);
- List<String> childNames = new ArrayList<String>(znode.getChildSet());
- Collections.sort(childNames);
- return childNames;
- }
- else
- {
- // System.out.println("zk-cache-miss");
- try
- {
- cache.lockWrite();
-
- List<String> childNames =
- _baseAccessor.getChildNames(serverParentPath, options);
- // System.out.println("\t--" + childNames);
- cache.addToParentChildSet(serverParentPath, childNames);
-
- return childNames;
- }
- finally
- {
- cache.unlockWrite();
- }
- }
- }
-
- // no cache
- return _baseAccessor.getChildNames(serverParentPath, options);
- }
-
- @Override
- public List<T> getChildren(String parentPath, List<Stat> stats, int options)
- {
- List<String> childNames = getChildNames(parentPath, options);
- if (childNames == null)
- {
- return null;
- }
-
- List<String> paths = new ArrayList<String>();
- for (String childName : childNames)
- {
- String path = parentPath + "/" + childName;
- paths.add(path);
- }
-
- return get(paths, stats, options);
- }
-
- @Override
- public void subscribeDataChanges(String path, IZkDataListener listener)
- {
- String serverPath = prependChroot(path);
-
- _baseAccessor.subscribeDataChanges(serverPath, listener);
- }
-
- @Override
- public void unsubscribeDataChanges(String path, IZkDataListener listener)
- {
- String serverPath = prependChroot(path);
-
- _baseAccessor.unsubscribeDataChanges(serverPath, listener);
- }
-
- @Override
- public List<String> subscribeChildChanges(String path, IZkChildListener listener)
- {
- String serverPath = prependChroot(path);
-
- return _baseAccessor.subscribeChildChanges(serverPath, listener);
- }
-
- @Override
- public void unsubscribeChildChanges(String path, IZkChildListener listener)
- {
- String serverPath = prependChroot(path);
-
- _baseAccessor.unsubscribeChildChanges(serverPath, listener);
- }
-
- @Override
- public void subscribe(String parentPath, HelixPropertyListener listener)
- {
- String serverPath = prependChroot(parentPath);
- _zkCache.subscribe(serverPath, listener);
- }
-
- @Override
- public void unsubscribe(String parentPath, HelixPropertyListener listener)
- {
- String serverPath = prependChroot(parentPath);
- _zkCache.unsubscribe(serverPath, listener);
- }
-
- @Override
- public void start()
- {
-
- LOG.info("START: Init ZkCacheBaseDataAccessor: " + _chrootPath + ", " + _wtCachePaths
- + ", " + _zkCachePaths);
-
- // start event thread
- try
- {
- _eventLock.lockInterruptibly();
- if (_eventThread != null)
- {
- LOG.warn(_eventThread + " has already started");
- }
- else
- {
-
- if (_zkCachePaths == null || _zkCachePaths.isEmpty())
- {
- LOG.warn("ZkCachePaths is null or empty. Will not start ZkCacheEventThread");
- }
- else
- {
- LOG.debug("Starting ZkCacheEventThread...");
-
- _eventThread = new ZkCacheEventThread("");
- _eventThread.start();
- }
- }
- }
- catch (InterruptedException e)
- {
- LOG.error("Current thread is interrupted when starting ZkCacheEventThread. ", e);
- }
- finally
- {
- _eventLock.unlock();
- }
- LOG.debug("Start ZkCacheEventThread...done");
-
- _wtCache = new WriteThroughCache<T>(_baseAccessor, _wtCachePaths);
- _zkCache =
- new ZkCallbackCache<T>(_baseAccessor, _chrootPath, _zkCachePaths, _eventThread);
-
- if (_wtCachePaths != null && !_wtCachePaths.isEmpty())
- {
- for (String path : _wtCachePaths)
- {
- _cacheMap.put(path, _wtCache);
- }
- }
-
- if (_zkCachePaths != null && !_zkCachePaths.isEmpty())
- {
- for (String path : _zkCachePaths)
- {
- _cacheMap.put(path, _zkCache);
- }
- }
- }
-
- @Override
- public void stop()
- {
- try
- {
- _eventLock.lockInterruptibly();
-
- if (_zkclient != null)
- {
- _zkclient.close();
- _zkclient = null;
- }
-
- if (_eventThread == null)
- {
- LOG.warn(_eventThread + " has already stopped");
- return;
- }
-
- LOG.debug("Stopping ZkCacheEventThread...");
- _eventThread.interrupt();
- _eventThread.join(2000);
- _eventThread = null;
- }
- catch (InterruptedException e)
- {
- LOG.error("Current thread is interrupted when stopping ZkCacheEventThread.");
- }
- finally
- {
- _eventLock.unlock();
- }
-
- LOG.debug("Stop ZkCacheEventThread...done");
-
- }
-
- @Override
- public void reset()
- {
- if (_wtCache != null)
- {
- _wtCache.reset();
- }
-
- if (_zkCache != null)
- {
- _zkCache.reset();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheEventThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheEventThread.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheEventThread.java
deleted file mode 100644
index 4d3dc21..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCacheEventThread.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.apache.log4j.Logger;
-
-// copy from ZkEventThread
-public class ZkCacheEventThread extends Thread
-{
-
- private static final Logger LOG =
- Logger.getLogger(ZkCacheEventThread.class);
- private final BlockingQueue<ZkCacheEvent> _events = new LinkedBlockingQueue<ZkCacheEvent>();
- private static AtomicInteger _eventId = new AtomicInteger(0);
-
- static abstract class ZkCacheEvent
- {
-
- private final String _description;
-
- public ZkCacheEvent(String description)
- {
- _description = description;
- }
-
- public abstract void run() throws Exception;
-
- @Override
- public String toString()
- {
- return "ZkCacheEvent[" + _description + "]";
- }
- }
-
- ZkCacheEventThread(String name)
- {
- setDaemon(true);
- setName("ZkCache-EventThread-" + getId() + "-" + name);
- }
-
- @Override
- public void run()
- {
- LOG.info("Starting ZkCache event thread.");
- try
- {
- while (!isInterrupted())
- {
- ZkCacheEvent zkEvent = _events.take();
- int eventId = _eventId.incrementAndGet();
- LOG.debug("Delivering event #" + eventId + " " + zkEvent);
- try
- {
- zkEvent.run();
- }
- catch (InterruptedException e)
- {
- interrupt();
- }
- catch (ZkInterruptedException e)
- {
- interrupt();
- }
- catch (Throwable e)
- {
- LOG.error("Error handling event " + zkEvent, e);
- }
- LOG.debug("Delivering event #" + eventId + " done");
- }
- }
- catch (InterruptedException e)
- {
- LOG.info("Terminate ZkClient event thread.");
- }
- }
-
- public void send(ZkCacheEvent event)
- {
- if (!isInterrupted())
- {
- LOG.debug("New event: " + event);
- _events.add(event);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCallbackCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCallbackCache.java
deleted file mode 100644
index aa3c559..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkCallbackCache.java
+++ /dev/null
@@ -1,348 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkCacheEventThread.ZkCacheEvent;
-import com.linkedin.helix.store.HelixPropertyListener;
-import com.linkedin.helix.store.zk.ZNode;
-
-public class ZkCallbackCache<T> extends Cache<T> implements
- IZkChildListener,
- IZkDataListener,
- IZkStateListener
-{
- private static Logger LOG = Logger.getLogger(ZkCallbackCache.class);
-
- final BaseDataAccessor<T> _accessor;
- final String _chrootPath;
-
- private final ZkCacheEventThread _eventThread;
- private final Map<String, Set<HelixPropertyListener>> _listener;
-
- public ZkCallbackCache(BaseDataAccessor<T> accessor, String chrootPath,
- List<String> paths, ZkCacheEventThread eventThread)
- {
- super();
- _accessor = accessor;
- _chrootPath = chrootPath;
-
- _listener = new ConcurrentHashMap<String, Set<HelixPropertyListener>>();
- _eventThread = eventThread;
-
- // init cache
- // System.out.println("init cache: " + paths);
- if (paths != null && !paths.isEmpty())
- {
- for (String path : paths)
- {
- updateRecursive(path);
- }
- }
- }
-
- @Override
- public void update(String path, T data, Stat stat)
- {
- String parentPath = new File(path).getParent();
- String childName = new File(path).getName();
-
- addToParentChildSet(parentPath, childName);
- ZNode znode = _cache.get(path);
- if (znode == null)
- {
- _cache.put(path, new ZNode(path, data, stat));
- fireEvents(path, EventType.NodeCreated);
- }
- else
- {
- Stat oldStat = znode.getStat();
-
- znode.setData(data);
- znode.setStat(stat);
- // System.out.println("\t\t--setData. path: " + path + ", data: " + data);
-
- if (oldStat.getCzxid() != stat.getCzxid())
- {
- fireEvents(path, EventType.NodeDeleted);
- fireEvents(path, EventType.NodeCreated);
- }
- else if (oldStat.getVersion() != stat.getVersion())
- {
- // System.out.println("\t--fireNodeChanged: " + path + ", oldVersion: " +
- // oldStat.getVersion() + ", newVersion: " + stat.getVersion());
- fireEvents(path, EventType.NodeDataChanged);
- }
- }
- }
-
- // TODO: make readData async
- @Override
- public void updateRecursive(String path)
- {
- if (path == null)
- {
- return;
- }
-
- try
- {
- _lock.writeLock().lock();
- try
- {
- // subscribe changes before read
- _accessor.subscribeDataChanges(path, this);
-
- // update this node
- Stat stat = new Stat();
- T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-
- update(path, readData, stat);
- }
- catch (ZkNoNodeException e)
- {
- // OK. znode not exists
- // we still need to subscribe child change
- }
-
- // recursively update children nodes if not exists
- // System.out.println("subcribeChildChange: " + path);
- ZNode znode = _cache.get(path);
- List<String> childNames = _accessor.subscribeChildChanges(path, this);
- if (childNames != null && !childNames.isEmpty())
- {
- for (String childName : childNames)
- {
- if (!znode.hasChild(childName))
- {
- String childPath = path + "/" + childName;
- znode.addChild(childName);
- updateRecursive(childPath);
- }
- }
- }
- }
- finally
- {
- _lock.writeLock().unlock();
- }
- }
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
- {
- // System.out.println("handleChildChange: " + parentPath + ", " + currentChilds);
-
- // this is invoked if subscribed for childChange and node gets deleted
- if (currentChilds == null)
- {
- return;
- }
-
- updateRecursive(parentPath);
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception
- {
- // System.out.println("handleDataChange: " + dataPath);
- try
- {
- _lock.writeLock().lock();
-
- // TODO: optimize it by get stat from callback
- Stat stat = new Stat();
- Object readData =
- _accessor.get(dataPath, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-
- ZNode znode = _cache.get(dataPath);
- if (znode != null)
- {
- Stat oldStat = znode.getStat();
-
- // System.out.println("handleDataChange: " + dataPath + ", data: " + data);
- // System.out.println("handleDataChange: " + dataPath + ", oldCzxid: " +
- // oldStat.getCzxid() + ", newCzxid: " + stat.getCzxid()
- // + ", oldVersion: " + oldStat.getVersion() + ", newVersion: " +
- // stat.getVersion());
- znode.setData(readData);
- znode.setStat(stat);
-
- // if create right after delete, and zkCallback comes after create
- // no DataDelete() will be fired, instead will fire 2 DataChange()
- // see ZkClient.fireDataChangedEvents()
- if (oldStat.getCzxid() != stat.getCzxid())
- {
- fireEvents(dataPath, EventType.NodeDeleted);
- fireEvents(dataPath, EventType.NodeCreated);
- }
- else if (oldStat.getVersion() != stat.getVersion())
- {
- // System.out.println("\t--fireNodeChanged: " + dataPath + ", oldVersion: " +
- // oldStat.getVersion() + ", newVersion: " + stat.getVersion());
- fireEvents(dataPath, EventType.NodeDataChanged);
- }
- }
- else
- {
- // we may see dataChange on child before childChange on parent
- // in this case, let childChange update cache
- }
- }
- finally
- {
- _lock.writeLock().unlock();
- }
-
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception
- {
- // System.out.println("handleDataDeleted: " + dataPath);
-
- try
- {
- _lock.writeLock().lock();
- _accessor.unsubscribeDataChanges(dataPath, this);
- _accessor.unsubscribeChildChanges(dataPath, this);
-
- String parentPath = new File(dataPath).getParent();
- String name = new File(dataPath).getName();
- removeFromParentChildSet(parentPath, name);
- _cache.remove(dataPath);
-
- fireEvents(dataPath, EventType.NodeDeleted);
- }
- finally
- {
- _lock.writeLock().unlock();
- }
- }
-
- @Override
- public void handleStateChanged(KeeperState state) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void handleNewSession() throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- public void subscribe(String path, HelixPropertyListener listener)
- {
- synchronized (_listener)
- {
- Set<HelixPropertyListener> listeners = _listener.get(path);
- if (listeners == null)
- {
- listeners = new CopyOnWriteArraySet<HelixPropertyListener>();
- _listener.put(path, listeners);
- }
- listeners.add(listener);
- }
- }
-
- public void unsubscribe(String path, HelixPropertyListener childListener)
- {
- synchronized (_listener)
- {
- final Set<HelixPropertyListener> listeners = _listener.get(path);
- if (listeners != null)
- {
- listeners.remove(childListener);
- }
- }
- }
-
- private void fireEvents(final String path, EventType type)
- {
- String tmpPath = path;
- final String clientPath =
- (_chrootPath == null ? path : (_chrootPath.equals(path) ? "/"
- : path.substring(_chrootPath.length())));
-
- while (tmpPath != null)
- {
- Set<HelixPropertyListener> listeners = _listener.get(tmpPath);
-
- if (listeners != null && !listeners.isEmpty())
- {
- for (final HelixPropertyListener listener : listeners)
- {
- try
- {
- switch (type)
- {
- case NodeDataChanged:
- // listener.onDataChange(path);
- _eventThread.send(new ZkCacheEvent("dataChange on " + path + " send to "
- + listener)
- {
- @Override
- public void run() throws Exception
- {
- listener.onDataChange(clientPath);
- }
- });
- break;
- case NodeCreated:
- // listener.onDataCreate(path);
- _eventThread.send(new ZkCacheEvent("dataCreate on " + path + " send to "
- + listener)
- {
- @Override
- public void run() throws Exception
- {
- listener.onDataCreate(clientPath);
- }
- });
- break;
- case NodeDeleted:
- // listener.onDataDelete(path);
- _eventThread.send(new ZkCacheEvent("dataDelete on " + path + " send to "
- + listener)
- {
- @Override
- public void run() throws Exception
- {
- listener.onDataDelete(clientPath);
- }
- });
- break;
- default:
- break;
- }
- }
- catch (Exception e)
- {
- LOG.error("Exception in handle events.", e);
- }
- }
- }
-
- tmpPath = new File(tmpPath).getParent();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkClient.java
deleted file mode 100644
index 8912154..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkClient.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import org.I0Itec.zkclient.IZkConnection;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.exception.ZkException;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.SerializableSerializer;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
-import com.linkedin.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
-
-/**
- * ZKClient does not provide some functionalities, this will be used for quick fixes if
- * any bug found in ZKClient or if we need additional features but can't wait for the new
- * ZkClient jar Ideally we should commit the changes we do here to ZKClient.
- *
- * @author kgopalak
- *
- */
-
-public class ZkClient extends org.I0Itec.zkclient.ZkClient
-{
- private static Logger LOG = Logger.getLogger(ZkClient.class);
- public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
- public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
- // public static String sessionId;
- // public static String sessionPassword;
-
- private PathBasedZkSerializer _zkSerializer;
-
- public ZkClient(IZkConnection connection, int connectionTimeout,
- PathBasedZkSerializer zkSerializer)
- {
- super(connection, connectionTimeout, new ByteArraySerializer());
- _zkSerializer = zkSerializer;
-
- StackTraceElement[] calls = Thread.currentThread().getStackTrace();
- LOG.info("create a new zkclient. " + Arrays.asList(calls));
- }
-
- public ZkClient(IZkConnection connection, int connectionTimeout,
- ZkSerializer zkSerializer)
- {
- this(connection, connectionTimeout, new BasicZkSerializer(zkSerializer));
- }
-
- public ZkClient(IZkConnection connection, int connectionTimeout)
- {
- this(connection, connectionTimeout, new SerializableSerializer());
- }
-
- public ZkClient(IZkConnection connection)
- {
- this(connection, Integer.MAX_VALUE, new SerializableSerializer());
- }
-
- public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
- ZkSerializer zkSerializer)
- {
- this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
- }
-
- public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
- PathBasedZkSerializer zkSerializer)
- {
- this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
- }
-
- public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)
- {
- this(new ZkConnection(zkServers, sessionTimeout),
- connectionTimeout,
- new SerializableSerializer());
- }
-
- public ZkClient(String zkServers, int connectionTimeout)
- {
- this(new ZkConnection(zkServers), connectionTimeout, new SerializableSerializer());
- }
-
- public ZkClient(String zkServers)
- {
- this(new ZkConnection(zkServers), Integer.MAX_VALUE, new SerializableSerializer());
- }
-
- {
- }
-
- @Override
- public void setZkSerializer(ZkSerializer zkSerializer)
- {
- _zkSerializer = new BasicZkSerializer(zkSerializer);
- }
-
- public void setZkSerializer(PathBasedZkSerializer zkSerializer)
- {
- _zkSerializer = zkSerializer;
- }
-
- public IZkConnection getConnection()
- {
- return _connection;
- }
-
- @Override
- public void close() throws ZkInterruptedException
- {
- StackTraceElement[] calls = Thread.currentThread().getStackTrace();
- LOG.info("closing a zkclient. zookeeper: "
- + (_connection == null ? "null" : ((ZkConnection) _connection).getZookeeper())
- + ", callStack: " + Arrays.asList(calls));
-
- super.close();
- }
-
- public Stat getStat(final String path)
- {
- long startT = System.nanoTime();
-
- try
- {
- Stat stat = retryUntilConnected(new Callable<Stat>()
- {
-
- @Override
- public Stat call() throws Exception
- {
- Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false);
- return stat;
- }
- });
-
- return stat;
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("exists, path: " + path + ", time: " + (endT - startT) + " ns");
- }
- }
-
- // override exists(path, watch), so we can record all exists requests
- @Override
- protected boolean exists(final String path, final boolean watch)
- {
- long startT = System.nanoTime();
-
- try
- {
- return retryUntilConnected(new Callable<Boolean>()
- {
- @Override
- public Boolean call() throws Exception
- {
- return _connection.exists(path, watch);
- }
- });
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("exists, path: " + path + ", time: " + (endT - startT) + " ns");
- }
- }
-
- // override getChildren(path, watch), so we can record all getChildren requests
- @Override
- protected List<String> getChildren(final String path, final boolean watch)
- {
- long startT = System.nanoTime();
-
- try
- {
- return retryUntilConnected(new Callable<List<String>>()
- {
- @Override
- public List<String> call() throws Exception
- {
- return _connection.getChildren(path, watch);
- }
- });
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("getChildren, path: " + path + ", time: " + (endT - startT) + " ns");
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T extends Object> T deserialize(byte[] data, String path)
- {
- if (data == null)
- {
- return null;
- }
- return (T) _zkSerializer.deserialize(data, path);
- }
-
- // override readData(path, stat, watch), so we can record all read requests
- @Override
- @SuppressWarnings("unchecked")
- protected <T extends Object> T readData(final String path,
- final Stat stat,
- final boolean watch)
- {
- long startT = System.nanoTime();
- try
- {
- byte[] data = retryUntilConnected(new Callable<byte[]>()
- {
-
- @Override
- public byte[] call() throws Exception
- {
- return _connection.readData(path, stat, watch);
- }
- });
- return (T) deserialize(data, path);
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("getData, path: " + path + ", time: " + (endT - startT) + " ns");
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T extends Object> T readDataAndStat(String path,
- Stat stat,
- boolean returnNullIfPathNotExists)
- {
- T data = null;
- try
- {
- data = (T) super.readData(path, stat);
- }
- catch (ZkNoNodeException e)
- {
- if (!returnNullIfPathNotExists)
- {
- throw e;
- }
- }
- return data;
- }
-
- public String getServers()
- {
- return _connection.getServers();
- }
-
- public byte[] serialize(Object data, String path)
- {
- return _zkSerializer.serialize(data, path);
- }
-
- @Override
- public void writeData(final String path, Object datat, final int expectedVersion)
- {
- long startT = System.nanoTime();
- try
- {
- final byte[] data = serialize(datat, path);
-
- retryUntilConnected(new Callable<Object>()
- {
-
- @Override
- public Object call() throws Exception
- {
- _connection.writeData(path, data, expectedVersion);
- return null;
- }
- });
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("setData, path: " + path + ", time: " + (endT - startT) + " ns");
- }
- }
-
- public Stat writeDataGetStat(final String path, Object datat, final int expectedVersion) throws InterruptedException
- {
- Stat stat = null;
- long start = System.nanoTime();
- try
- {
- byte[] bytes = _zkSerializer.serialize(datat, path);
- stat =
- ((ZkConnection) _connection).getZookeeper().setData(path,
- bytes,
- expectedVersion);
- return stat;
- }
- catch (KeeperException e)
- {
- throw ZkException.create(e);
- }
- finally
- {
- long end = System.nanoTime();
- LOG.info("setData, path: " + path + ", time: " + (end - start) + " ns");
- }
- }
-
- @Override
- public String create(final String path, Object data, final CreateMode mode) throws ZkInterruptedException,
- IllegalArgumentException,
- ZkException,
- RuntimeException
- {
- if (path == null)
- {
- throw new NullPointerException("path must not be null.");
- }
-
- long startT = System.nanoTime();
- try
- {
- final byte[] bytes = data == null ? null : serialize(data, path);
-
- return retryUntilConnected(new Callable<String>()
- {
-
- @Override
- public String call() throws Exception
- {
- return _connection.create(path, bytes, mode);
- }
- });
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("create, path: " + path + ", time: " + (endT - startT) + " ns");
- }
- }
-
- @Override
- public boolean delete(final String path)
- {
- long startT = System.nanoTime();
- try
- {
- try
- {
- retryUntilConnected(new Callable<Object>()
- {
-
- @Override
- public Object call() throws Exception
- {
- _connection.delete(path);
- return null;
- }
- });
-
- return true;
- }
- catch (ZkNoNodeException e)
- {
- return false;
- }
- }
- finally
- {
- long endT = System.nanoTime();
- LOG.info("delete, path: " + path + ", time: " + (endT - startT) + " ns");
- }
- }
-
- public void asyncCreate(final String path,
- Object datat,
- CreateMode mode,
- CreateCallbackHandler cb)
- {
- byte[] data = null;
- if (datat != null)
- {
- data = serialize(datat, path);
- }
- ((ZkConnection) _connection).getZookeeper().create(path, data, Ids.OPEN_ACL_UNSAFE, // Arrays.asList(DEFAULT_ACL),
- mode,
- cb,
- null);
- }
-
- public void asyncSetData(final String path,
- Object datat,
- int version,
- SetDataCallbackHandler cb)
- {
- final byte[] data = serialize(datat, path);
- ((ZkConnection) _connection).getZookeeper().setData(path, data, version, cb, null);
-
- }
-
- public void asyncGetData(final String path, GetDataCallbackHandler cb)
- {
- ((ZkConnection) _connection).getZookeeper().getData(path, null, cb, null);
- }
-
- public void asyncExists(final String path, ExistsCallbackHandler cb)
- {
- ((ZkConnection) _connection).getZookeeper().exists(path, null, cb, null);
-
- }
-
- public void asyncDelete(String path, DeleteCallbackHandler cb)
- {
- ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkStateChangeListener.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkStateChangeListener.java
deleted file mode 100644
index 2c4f269..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkStateChangeListener.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-public class ZkStateChangeListener implements IZkStateListener
-{
- private volatile boolean _isConnected;
- private volatile boolean _hasSessionExpired;
- private final ZKHelixManager _zkHelixManager;
-
- private static Logger logger = Logger.getLogger(ZkStateChangeListener.class);
-
- public ZkStateChangeListener(ZKHelixManager zkHelixManager)
- {
- this._zkHelixManager = zkHelixManager;
-
- }
-
- @Override
- public void handleNewSession()
- {
- // TODO:bug in zkclient .
- // zkclient does not invoke handleStateChanged when a session expires but
- // directly invokes handleNewSession
- _isConnected = true;
- _hasSessionExpired = false;
- _zkHelixManager.handleNewSession();
- }
-
- @Override
- public void handleStateChanged(KeeperState keeperState) throws Exception
- {
- switch (keeperState)
- {
- case SyncConnected:
- ZkConnection zkConnection =
- ((ZkConnection) _zkHelixManager._zkClient.getConnection());
- logger.info("KeeperState: " + keeperState + ", zookeeper:" + zkConnection.getZookeeper());
- _isConnected = true;
- break;
- case Disconnected:
- logger.info("KeeperState:" + keeperState + ", disconnectedSessionId: "
- + _zkHelixManager._sessionId + ", instance: "
- + _zkHelixManager.getInstanceName() + ", type: "
- + _zkHelixManager.getInstanceType());
-
- _isConnected = false;
- break;
- case Expired:
- logger.info("KeeperState:" + keeperState + ", expiredSessionId: "
- + _zkHelixManager._sessionId + ", instance: "
- + _zkHelixManager.getInstanceName() + ", type: "
- + _zkHelixManager.getInstanceType());
-
- _isConnected = false;
- _hasSessionExpired = true;
- break;
- }
- }
-
- boolean isConnected()
- {
- return _isConnected;
- }
-
- void disconnect()
- {
- _isConnected = false;
- }
-
- boolean hasSessionExpired()
- {
- return _hasSessionExpired;
- }
-}