You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[20/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/file/FileHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/file/FileHelixPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/file/FileHelixPropertyStore.java
new file mode 100644
index 0000000..f69dbfc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/file/FileHelixPropertyStore.java
@@ -0,0 +1,293 @@
+package org.apache.helix.store.file;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.store.HelixPropertyListener;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertySerializer;
+import org.apache.helix.store.PropertyStat;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.zookeeper.data.Stat;
+
+
+public class FileHelixPropertyStore<T> implements HelixPropertyStore<T>
+{
+ final FilePropertyStore<T> _store;
+
+ public FileHelixPropertyStore(final PropertySerializer<T> serializer,
+ String rootNamespace,
+ final PropertyJsonComparator<T> comparator)
+ {
+ _store = new FilePropertyStore<T>(serializer, rootNamespace, comparator);
+ }
+
+ @Override
+ public boolean create(String path, T record, int options)
+ {
+ return set(path, record, options);
+ }
+
+ @Override
+ public boolean set(String path, T record, int options)
+ {
+ try
+ {
+ _store.setProperty(path, record);
+ return true;
+ }
+ catch (PropertyStoreException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ @Override
+ public boolean update(String path, DataUpdater<T> updater, int options)
+ {
+ _store.updatePropertyUntilSucceed(path, updater);
+ return true;
+ }
+
+ @Override
+ public boolean remove(String path, int options)
+ {
+ try
+ {
+ _store.removeProperty(path);
+ return true;
+ }
+ catch (PropertyStoreException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ @Override
+ public boolean[] createChildren(List<String> paths, List<T> records, int options)
+ {
+ return setChildren(paths, records, options);
+ }
+
+ @Override
+ public boolean[] setChildren(List<String> paths, List<T> records, int options)
+ {
+ boolean[] success = new boolean[paths.size()];
+ for (int i = 0; i < paths.size(); i++)
+ {
+ success[i] = create(paths.get(i), records.get(i), options);
+ }
+ return success;
+ }
+
+ @Override
+ public boolean[] updateChildren(List<String> paths,
+ List<DataUpdater<T>> updaters,
+ int options)
+ {
+ boolean[] success = new boolean[paths.size()];
+ for (int i = 0; i < paths.size(); i++)
+ {
+ success[i] = update(paths.get(i), updaters.get(i), options);
+ }
+ return success;
+
+ }
+
+ @Override
+ public boolean[] remove(List<String> paths, int options)
+ {
+ boolean[] success = new boolean[paths.size()];
+ for (int i = 0; i < paths.size(); i++)
+ {
+ success[i] = remove(paths.get(i), options);
+ }
+ return success;
+ }
+
+ @Override
+ public T get(String path, Stat stat, int options)
+ {
+ PropertyStat propertyStat = new PropertyStat();
+ try
+ {
+ T value = _store.getProperty(path, propertyStat);
+ if (stat != null)
+ {
+ stat.setVersion(propertyStat.getVersion());
+ stat.setMtime(propertyStat.getLastModifiedTime());
+ }
+ return value;
+ }
+ catch (PropertyStoreException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public List<T> get(List<String> paths, List<Stat> stats, int options)
+ {
+ List<T> values = new ArrayList<T>();
+ for (int i = 0; i < paths.size(); i++)
+ {
+ values.add(get(paths.get(i), stats.get(i), options));
+ }
+ return values;
+ }
+
+ @Override
+ public List<T> getChildren(String parentPath, List<Stat> stats, int options)
+ {
+ List<String> childNames = getChildNames(parentPath, options);
+
+ if (childNames == null)
+ {
+ return null;
+ }
+
+ List<String> paths = new ArrayList<String>();
+ for (String childName : childNames)
+ {
+ String path = parentPath + "/" + childName;
+ paths.add(path);
+ }
+
+ return get(paths, stats, options);
+ }
+
+ @Override
+ public List<String> getChildNames(String parentPath, int options)
+ {
+ try
+ {
+ return _store.getPropertyNames(parentPath);
+ }
+ catch (PropertyStoreException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean exists(String path, int options)
+ {
+ return _store.exists(path);
+ }
+
+ @Override
+ public boolean[] exists(List<String> paths, int options)
+ {
+ boolean[] exists = new boolean[paths.size()];
+ for (int i = 0; i < paths.size(); i++)
+ {
+ exists[i] = exists(paths.get(i), options);
+ }
+ return exists;
+ }
+
+ @Override
+ public Stat[] getStats(List<String> paths, int options)
+ {
+ Stat[] stats = new Stat[paths.size()];
+ for (int i = 0; i < paths.size(); i++)
+ {
+ stats[i] = getStat(paths.get(i), options);
+ }
+ return stats;
+ }
+
+ @Override
+ public Stat getStat(String path, int options)
+ {
+ Stat stat = new Stat();
+ get(path, stat, options);
+ return stat;
+ }
+
+ @Override
+ public void subscribeDataChanges(String path, IZkDataListener listener)
+ {
+ throw new UnsupportedOperationException("subscribeDataChanges not supported");
+ }
+
+ @Override
+ public void unsubscribeDataChanges(String path, IZkDataListener listener)
+ {
+ throw new UnsupportedOperationException("unsubscribeDataChanges not supported");
+ }
+
+ @Override
+ public List<String> subscribeChildChanges(String path, IZkChildListener listener)
+ {
+ throw new UnsupportedOperationException("subscribeChildChanges not supported");
+ }
+
+ @Override
+ public void unsubscribeChildChanges(String path, IZkChildListener listener)
+ {
+ throw new UnsupportedOperationException("unsubscribeChildChanges not supported");
+ }
+
+ @Override
+ public void start()
+ {
+ _store.start();
+ }
+
+ @Override
+ public void stop()
+ {
+ _store.stop();
+ }
+
+ @Override
+ public void subscribe(String parentPath, final HelixPropertyListener listener)
+ {
+ try
+ {
+ _store.subscribeForPropertyChange(parentPath, new PropertyChangeListener<T>()
+ {
+
+ @Override
+ public void onPropertyChange(String key)
+ {
+ listener.onDataChange(key);
+ }
+ });
+ }
+ catch (PropertyStoreException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void unsubscribe(String parentPath, HelixPropertyListener listener)
+ {
+ throw new UnsupportedOperationException("unsubscribe not supported");
+ }
+
+ @Override
+ public void reset()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/file/FilePropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/file/FilePropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/file/FilePropertyStore.java
new file mode 100644
index 0000000..1b50613
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/file/FilePropertyStore.java
@@ -0,0 +1,942 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.file;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.commons.io.DirectoryWalker;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.manager.file.FileCallbackHandler;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertySerializer;
+import org.apache.helix.store.PropertyStat;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.log4j.Logger;
+
+
+/**
+ *
+ * property store that built upon a file system
+ * since file systems usually have sophisticated cache mechanisms
+ * there is no need for another cache for file property store
+ *
+ * NOTES:
+ * lastModified timestamp provided by java file io has only second level precision
+ * so it is possible that files have been modified without changing its lastModified timestamp
+ * the solution is to use a map that caches the files changed in last second
+ * and in the next round of refresh, check against this map to figure out whether a file
+ * has been changed/created in the last second
+ * @link http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6939260
+ *
+ * @author zzhang
+ * @param <T>
+ */
+
+public class FilePropertyStore<T> implements PropertyStore<T>
+{
+ private static Logger logger = Logger.getLogger(FilePropertyStore.class);
+
+ private final String ROOT = "/";
+ private final long TIMEOUT = 30L;
+ private final long REFRESH_PERIOD = 1000; // ms
+ private final int _id = new Random().nextInt();
+ private final String _rootNamespace;
+ private PropertySerializer<T> _serializer;
+ private final PropertyJsonComparator<T> _comparator;
+
+ private Thread _refreshThread;
+ private final AtomicBoolean _stopRefreshThread;
+ private final CountDownLatch _firstRefreshCounter;
+ private final ReadWriteLock _readWriteLock;
+
+ private Map<String, T> _lastModifiedFiles = new HashMap<String, T>();
+ private Map<String, T> _curModifiedFiles = new HashMap<String, T>();
+
+ private final Map< String, CopyOnWriteArraySet<PropertyChangeListener<T> > > _fileChangeListeners; // map key to listener
+
+ private class FilePropertyStoreRefreshThread implements Runnable
+ {
+ private final PropertyStoreDirWalker _dirWalker;
+
+ public class PropertyStoreDirWalker extends DirectoryWalker
+ {
+ private final File _propertyStoreRootDir;
+ private long _lastNotifiedTime = 0;
+ private long _currentHighWatermark;
+
+ public PropertyStoreDirWalker(String rootNamespace, ReadWriteLock readWriteLock)
+ {
+ _propertyStoreRootDir = new File(rootNamespace);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected void handleFile(File file, int depth, Collection results) throws IOException
+ {
+ if (file.lastModified() < _lastNotifiedTime)
+ {
+ return;
+ }
+
+ String path = getRelativePath(file.getAbsolutePath());
+ T newValue = null;
+ try
+ {
+ newValue = getProperty(path);
+ } catch (PropertyStoreException e)
+ {
+ logger.error("fail to get property, path:" + path, e);
+ }
+
+ if (file.lastModified() == _lastNotifiedTime && _lastModifiedFiles.containsKey(path))
+ {
+ T value = _lastModifiedFiles.get(path);
+
+ if (_comparator.compare(value, newValue) == 0)
+ {
+ if (file.lastModified() == _currentHighWatermark)
+ {
+ _curModifiedFiles.put(path, newValue);
+ }
+ return;
+ }
+ }
+
+ if (file.lastModified() > _currentHighWatermark)
+ {
+ _currentHighWatermark = file.lastModified();
+
+ _curModifiedFiles.clear();
+ _curModifiedFiles.put(path, newValue);
+ }
+ else if (file.lastModified() == _currentHighWatermark)
+ {
+ _curModifiedFiles.put(path, newValue);
+ }
+
+ // debug
+// logger.error("file: " + file.getAbsolutePath() + " changed@" + file.lastModified() + " (" +
+// new Date(file.lastModified()) + ")");
+ results.add(file);
+ }
+
+ @Override
+ protected boolean handleDirectory(File dir, int depth, Collection results) throws IOException
+ {
+ if (dir.lastModified() < _lastNotifiedTime)
+ {
+ return true;
+ }
+
+ String path = getRelativePath(dir.getAbsolutePath());
+ T newValue = null;
+ try
+ {
+ newValue = getProperty(path);
+ }
+ catch (PropertyStoreException e)
+ {
+ logger.error("fail to get property, path:" + path, e);
+ }
+
+ if (dir.lastModified() == _lastNotifiedTime && _lastModifiedFiles.containsKey(path))
+ {
+ T value = _lastModifiedFiles.get(path);
+ if (_comparator.compare(value, newValue) == 0)
+ {
+ if (dir.lastModified() == _currentHighWatermark)
+ {
+ _curModifiedFiles.put(path, newValue);
+ }
+ return true;
+ }
+ }
+ _curModifiedFiles.put(path, newValue);
+
+ if (dir.lastModified() > _currentHighWatermark)
+ {
+ _currentHighWatermark = dir.lastModified();
+
+ _curModifiedFiles.clear();
+ _curModifiedFiles.put(path, newValue);
+ }
+ else if (dir.lastModified() == _currentHighWatermark)
+ {
+ _curModifiedFiles.put(path, newValue);
+ }
+
+ logger.debug("dir: " + dir.getAbsolutePath() + " changed@" + dir.lastModified() +
+ " (" + new Date(dir.lastModified()) + ")");
+ results.add(dir);
+
+ return true;
+ }
+
+ public void walk()
+ {
+ HashSet<File> files = new HashSet<File>();
+
+ try
+ {
+ _currentHighWatermark = _lastNotifiedTime;
+ _readWriteLock.readLock().lock();
+ super.walk(_propertyStoreRootDir, files);
+ }
+ catch (IOException e)
+ {
+ logger.error("IO exception when walking through dir:" + _propertyStoreRootDir, e);
+ }
+ finally
+ {
+ _lastNotifiedTime = _currentHighWatermark;
+ _lastModifiedFiles.clear();
+
+ Map<String, T> temp = _lastModifiedFiles;
+ _lastModifiedFiles = _curModifiedFiles;
+ _curModifiedFiles = temp;
+ _readWriteLock.readLock().unlock();
+ }
+
+ // TODO see if we can use DirectoryFileComparator.DIRECTORY_COMPARATOR.sort()
+ File[] fileArray = new File[files.size()];
+ fileArray = files.toArray(fileArray);
+ Arrays.sort(fileArray, new Comparator<File>() {
+
+ @Override
+ public int compare(File file1, File file2)
+ {
+ return file1.getAbsoluteFile().compareTo(file2.getAbsoluteFile());
+ }
+
+ });
+
+
+ // notify listeners
+ for (int i = 0; i < fileArray.length; i++)
+ {
+ File file = fileArray[i];
+
+ // debug
+// logger.error("Before send notification of " + file.getAbsolutePath() + " to listeners " + _fileChangeListeners);
+
+ for (Map.Entry< String, CopyOnWriteArraySet<PropertyChangeListener<T> > > entry : _fileChangeListeners.entrySet())
+ {
+ String absPath = file.getAbsolutePath();
+ if (absPath.startsWith(entry.getKey()))
+ {
+ for (PropertyChangeListener<T> listener : entry.getValue())
+ {
+ if (listener instanceof FileCallbackHandler)
+ {
+ FileCallbackHandler handler = (FileCallbackHandler) listener;
+
+ // debug
+// logger.error("Send notification of " + file.getAbsolutePath() + " to listener:" + handler.getListener());
+ }
+ listener.onPropertyChange(getRelativePath(absPath));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public FilePropertyStoreRefreshThread(ReadWriteLock readWriteLock)
+ {
+ _dirWalker = new PropertyStoreDirWalker(_rootNamespace, readWriteLock);
+ }
+
+ @Override
+ public void run()
+ {
+ while (!_stopRefreshThread.get())
+ {
+ _dirWalker.walk();
+ _firstRefreshCounter.countDown();
+
+ try
+ {
+ Thread.sleep(REFRESH_PERIOD);
+// System.out.println("refresh thread is running");
+ }
+ catch (InterruptedException ie)
+ {
+ // do nothing
+ }
+ }
+
+ logger.info("Quitting file property store refresh thread");
+
+ }
+
+ }
+
+// public FilePropertyStore(final PropertySerializer<T> serializer)
+// {
+// this(serializer, System.getProperty("java.io.tmpdir"));
+// }
+
+ public FilePropertyStore(final PropertySerializer<T> serializer, String rootNamespace,
+ final PropertyJsonComparator<T> comparator)
+ {
+ _serializer = serializer;
+ _comparator = comparator;
+ _stopRefreshThread = new AtomicBoolean(false);
+ _firstRefreshCounter = new CountDownLatch(1);
+ _readWriteLock = new ReentrantReadWriteLock();
+
+ _fileChangeListeners = new ConcurrentHashMap< String, CopyOnWriteArraySet<PropertyChangeListener<T> > >();
+
+ // Strip off leading slash
+ while (rootNamespace.startsWith("/"))
+ {
+ // rootNamespace = rootNamespace.substring(1, rootNamespace.length());
+ rootNamespace = rootNamespace.substring(1);
+ }
+ _rootNamespace = "/" + rootNamespace;
+
+ this.createRootNamespace();
+ }
+
+
+ @Override
+ public boolean start()
+ {
+ // check if start has already been invoked
+ if (_firstRefreshCounter.getCount() == 0)
+ return true;
+
+ logger.debug("starting file property store polling thread, id=" + _id);
+
+ _stopRefreshThread.set(false);
+ _refreshThread = new Thread(new FilePropertyStoreRefreshThread(_readWriteLock),
+ "FileRefreshThread_" + _id);
+ // _refreshThread.setDaemon(true);
+ _refreshThread.start();
+
+ try
+ {
+ boolean timeout = !_firstRefreshCounter.await(TIMEOUT, TimeUnit.SECONDS);
+ if (timeout)
+ {
+ throw new Exception("Timeout while waiting for the first refresh to complete");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean stop()
+ {
+ if (_stopRefreshThread.compareAndSet(false, true))
+ {
+ // _stopRefreshThread.set(true);
+ if (_refreshThread != null)
+ {
+ try
+ {
+ _refreshThread.join();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ return true;
+ }
+
+ private String getPath(String key)
+ {
+ // Strip off leading slash
+ while (key.startsWith("/"))
+ {
+ // key = key.substring(1, key.length());
+ key = key.substring(1);
+ }
+
+ // String path = key.equals(ROOT) ? _rootNamespace : (_rootNamespace + "/" + key);
+ String path = key.equals("") ? _rootNamespace : (_rootNamespace + "/" + key);
+ return path;
+ }
+
+ private String getRelativePath(String path)
+ {
+ // strip off rootPath from path
+ if (!path.startsWith(_rootNamespace))
+ {
+ logger.warn("path does NOT start with: " + _rootNamespace);
+ return path;
+ }
+
+ if (path.equals(_rootNamespace))
+ return ROOT;
+
+ // path = path.substring(_rootNamespace.length() + 1);
+ path = path.substring(_rootNamespace.length());
+
+ return path;
+ }
+
+ public void createRootNamespace()
+ {
+ createPropertyNamespace(ROOT);
+ }
+
+ @Override
+ public void createPropertyNamespace(String prefix)
+ {
+ String path = getPath(prefix);
+ File dir = new File(path);
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ if (dir.exists())
+ {
+ logger.warn(path + " already exists");
+ }
+ else
+ {
+ if (!dir.mkdirs())
+ {
+ logger.warn("Failed to create: " + path);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error("Failed to create dir: " + path + "\nexception:" + e);
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ }
+
+ }
+
+ @Override
+ public void setProperty(String key, T value) throws PropertyStoreException
+ {
+ String path = getPath(key);
+ File file = new File(path); // null;
+ // FileLock fLock = null;
+ FileOutputStream fout = null;
+
+ // TODO create non-exist dirs recursively
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ // file = new File(path);
+ if (!file.exists())
+ {
+ FileUtils.touch(file);
+ }
+
+ fout = new FileOutputStream(file);
+ // FileChannel fChannel = fout.getChannel();
+
+ // TODO need a timeout on lock operation
+ // fLock = fChannel.lock();
+
+ byte[] bytes = _serializer.serialize(value);
+ fout.write(bytes);
+ }
+// catch (FileNotFoundException e)
+// {
+// logger.error("fail to set property, key:" + key +
+// "\nfile not found exception:" + e);
+// }
+ catch (IOException e)
+ {
+ logger.error("fail to set property. key:" + key +
+ "value:" + value, e);
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ try
+ {
+ // if (fLock != null && fLock.isValid())
+ // fLock.release();
+
+ if (fout != null)
+ {
+ fout.close();
+ }
+ }
+ catch (IOException e)
+ {
+ logger.error("fail to close file. key:" + key, e);
+ }
+
+ }
+
+ }
+
+ @Override
+ public T getProperty(String key) throws PropertyStoreException
+ {
+ return this.getProperty(key, null);
+ }
+
+ @Override
+ public T getProperty(String key, PropertyStat propertyStat) throws PropertyStoreException
+ {
+
+ String path = getPath(key);
+ File file = null;
+ // FileLock fLock = null;
+ FileInputStream fin = null;
+
+ try
+ {
+ // TODO need a timeout on lock operation
+ _readWriteLock.readLock().lock();
+
+ file = new File(path);
+ if (!file.exists())
+ {
+ return null;
+ }
+
+ fin = new FileInputStream(file);
+ // FileChannel fChannel = fin.getChannel();
+ // fLock = fChannel.lock(0L, Long.MAX_VALUE, true);
+
+ int availableBytes = fin.available();
+ if (availableBytes == 0)
+ {
+ return null;
+ }
+
+ byte[] bytes = new byte[availableBytes];
+ fin.read(bytes);
+
+ if (propertyStat != null)
+ {
+ propertyStat.setLastModifiedTime(file.lastModified());
+ }
+
+ return _serializer.deserialize(bytes);
+ }
+ catch (FileNotFoundException e)
+ {
+ return null;
+ }
+ catch (IOException e)
+ {
+ logger.error("fail to get property. key:" + key, e);
+ }
+ finally
+ {
+ _readWriteLock.readLock().unlock();
+ try
+ {
+ // if (fLock != null && fLock.isValid())
+ // fLock.release();
+ if (fin != null)
+ {
+ fin.close();
+ }
+ }
+ catch (IOException e)
+ {
+ logger.error("fail to close file. key:" + key, e);
+ }
+
+ }
+
+ return null;
+ }
+
+ @Override
+ public void removeProperty(String key) throws PropertyStoreException
+ {
+ String path = getPath(key);
+ File file = new File(path);
+
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ if (!file.exists())
+ {
+ return;
+ }
+
+ boolean success = file.delete();
+ if (!success)
+ {
+ logger.error("fail to remove file. path:" + path);
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error("fail to remove file. path:" + path, e);
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ }
+
+
+ }
+
+ public void removeRootNamespace() throws PropertyStoreException
+ {
+ removeNamespace(ROOT);
+ }
+
+ @Override
+ public void removeNamespace(String prefix) throws PropertyStoreException
+ {
+ String path = getPath(prefix);
+
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ FileUtils.deleteDirectory(new File(path));
+ }
+ catch (IOException e)
+ {
+ logger.error("fail to remove namespace, path:" + path, e);
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ }
+ }
+
+ private void doGetPropertyNames(String path, List<String> leafNodes)
+ throws PropertyStoreException
+ {
+ File file = new File(path);
+ if (!file.exists())
+ {
+ return;
+ }
+
+ // List<String> childs = _zkClient.getChildren(path);
+ if (file.isDirectory())
+ {
+ String[] childs = file.list();
+ if (childs == null || childs.length == 0)
+ {
+ return;
+ }
+ for (String child : childs)
+ {
+ String pathToChild = path + "/" + child;
+ doGetPropertyNames(pathToChild, leafNodes);
+ }
+ }
+ else if (file.isFile())
+ {
+ // getProperty(getRelativePath(path));
+ leafNodes.add(getRelativePath(path));
+ return;
+ }
+ }
+
+
+ @Override
+ public List<String> getPropertyNames(String prefix) throws PropertyStoreException
+ {
+ String path = getPath(prefix);
+ List<String> propertyNames = new ArrayList<String>();
+
+ try
+ {
+ _readWriteLock.readLock().lock();
+ doGetPropertyNames(path, propertyNames);
+ }
+ finally
+ {
+ _readWriteLock.readLock().unlock();
+ }
+
+ // sort it to get deterministic order
+ Collections.sort(propertyNames);
+
+ return propertyNames;
+ }
+
+ @Override
+ public void setPropertyDelimiter(String delimiter) throws PropertyStoreException
+ {
+ throw new UnsupportedOperationException(
+ "setPropertyDelimiter() is NOT supported by FilePropertyStore");
+ }
+
+ @Override
+ public void subscribeForPropertyChange(String prefix, PropertyChangeListener<T> listener)
+ throws PropertyStoreException
+ {
+ if (null != listener)
+ {
+ String path = getPath(prefix);
+ synchronized (_fileChangeListeners)
+ {
+ CopyOnWriteArraySet<PropertyChangeListener <T> > listeners = _fileChangeListeners.get(path);
+ if (listeners == null) {
+ listeners = new CopyOnWriteArraySet<PropertyChangeListener <T> >();
+ _fileChangeListeners.put(path, listeners);
+ }
+ listeners.add(listener);
+ }
+ }
+
+ }
+
+ @Override
+ public void unsubscribeForPropertyChange(String prefix, PropertyChangeListener<T> listener)
+ throws PropertyStoreException
+ {
+ if (null != listener)
+ {
+ String path = getPath(prefix);
+ synchronized (_fileChangeListeners)
+ {
+ final Set<PropertyChangeListener<T> > listeners = _fileChangeListeners.get(path);
+ if (listeners != null)
+ {
+ listeners.remove(listener);
+ }
+ if (listeners == null || listeners.isEmpty())
+ {
+ _fileChangeListeners.remove(path);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public boolean canParentStoreData()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public String getPropertyRootNamespace()
+ {
+ return _rootNamespace;
+ }
+
+ @Override
+ public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater)
+ {
+ updatePropertyUntilSucceed(key, updater, true);
+ }
+
+ @Override
+ public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater,
+ boolean createIfAbsent)
+ {
+ String path = getPath(key);
+ File file = new File(path);
+ RandomAccessFile raFile = null;
+ FileLock fLock = null;
+
+ try
+ {
+ _readWriteLock.writeLock().lock();
+ if (!file.exists())
+ {
+ FileUtils.touch(file);
+ }
+
+ raFile = new RandomAccessFile(file, "rw");
+ FileChannel fChannel = raFile.getChannel();
+ fLock = fChannel.lock();
+
+ T current = getProperty(key);
+ T update = updater.update(current);
+ setProperty(key, update);
+ }
+ catch (Exception e)
+ {
+ logger.error("fail to updatePropertyUntilSucceed, path:" + path, e);
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ try
+ {
+ if (fLock != null && fLock.isValid())
+ {
+ fLock.release();
+ }
+
+ if (raFile != null)
+ {
+ raFile.close();
+ }
+ }
+ catch (IOException e)
+ {
+ logger.error("fail to close file, path:" + path, e);
+ }
+ }
+ }
+
+
+ @Override
+ public void setPropertySerializer(PropertySerializer<T> serializer)
+ {
+ _readWriteLock.writeLock().lock();
+ _serializer = serializer;
+ _readWriteLock.writeLock().unlock();
+ }
+
+ @Override
+ public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator)
+ {
+ return compareAndSet(key, expected, update, comparator, false);
+ }
+
+ @Override
+ public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator,
+ boolean createIfAbsent)
+ {
+ String path = getPath(key);
+ File file = new File(path);
+// FileInputStream fin = null;
+// FileOutputStream fout = null;
+ RandomAccessFile raFile = null;
+ FileLock fLock = null;
+
+ try
+ {
+ _readWriteLock.writeLock().lock();
+
+ if (createIfAbsent)
+ {
+ file.createNewFile();
+ }
+
+// fin = new FileInputStream(file);
+// FileChannel fChannel = fin.getChannel();
+ raFile = new RandomAccessFile(file, "rw");
+ FileChannel fChannel = raFile.getChannel();
+ fLock = fChannel.lock();
+
+ T current = getProperty(key);
+ if (comparator.compare(current, expected) == 0)
+ {
+// fout = new FileOutputStream(file);
+//
+// byte[] bytes = _serializer.serialize(update);
+// fout.write(bytes);
+ setProperty(key, update);
+ return true;
+ }
+
+ return false;
+ }
+ catch (FileNotFoundException e)
+ {
+ logger.error("fail to compareAndSet. path:" + path, e);
+ return false;
+ }
+ catch (Exception e)
+ {
+ logger.error("fail to compareAndSet. path:" + path, e);
+ return false;
+ }
+ finally
+ {
+ _readWriteLock.writeLock().unlock();
+ try
+ {
+ if (fLock != null && fLock.isValid())
+ {
+ fLock.release();
+ }
+
+ if (raFile != null)
+ {
+ raFile.close();
+ }
+
+// if (fin != null)
+// {
+// fin.close();
+// }
+//
+// if (fout != null)
+// {
+// fout.close();
+// }
+ }
+ catch (IOException e)
+ {
+ logger.error("fail to close file. path:" + path, e);
+ }
+ }
+
+ }
+
+ @Override
+ public boolean exists(String key)
+ {
+ String path = getPath(key);
+ File file = new File(path);
+ _readWriteLock.readLock().lock();
+
+ boolean ret = file.exists();
+ _readWriteLock.readLock().unlock();
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/file/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/file/package-info.java b/helix-core/src/main/java/org/apache/helix/store/file/package-info.java
new file mode 100644
index 0000000..33d87d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/file/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix implementation of file-based property store (Deprecated)
+ *
+ */
+package org.apache.helix.store.file;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/package-info.java b/helix-core/src/main/java/org/apache/helix/store/package-info.java
new file mode 100644
index 0000000..62c1fad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix application property store classes
+ *
+ */
+package org.apache.helix.store;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/PropertyItem.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/PropertyItem.java b/helix-core/src/main/java/org/apache/helix/store/zk/PropertyItem.java
new file mode 100644
index 0000000..8133bc8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/PropertyItem.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import org.apache.zookeeper.data.Stat;
+
+public class PropertyItem
+{
+ byte[] _value;
+ Stat _stat;
+
+ public PropertyItem(byte[] value, Stat stat)
+ {
+ _value = value;
+ _stat = stat;
+ }
+
+ public byte[] getBytes()
+ {
+ return _value;
+ }
+
+ public int getVersion()
+ {
+ return _stat.getVersion();
+ }
+
+ public long getLastModifiedTime()
+ {
+ return _stat.getMtime();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZKPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZKPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZKPropertyStore.java
new file mode 100644
index 0000000..b5092b6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZKPropertyStore.java
@@ -0,0 +1,735 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertySerializer;
+import org.apache.helix.store.PropertyStat;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+
+@Deprecated
+public class ZKPropertyStore<T> implements
+ PropertyStore<T>,
+ IZkStateListener,
+ IZkDataListener // , IZkChildListener,
+{
+ private static Logger LOG = Logger.getLogger(ZKPropertyStore.class);
+
+ class ByteArrayUpdater implements DataUpdater<byte[]>
+ {
+ final DataUpdater<T> _updater;
+ final PropertySerializer<T> _serializer;
+
+ ByteArrayUpdater(DataUpdater<T> updater, PropertySerializer<T> serializer)
+ {
+ _updater = updater;
+ _serializer = serializer;
+ }
+
+ @Override
+ public byte[] update(byte[] current)
+ {
+ try
+ {
+ T currentValue = null;
+ if (current != null)
+ {
+ currentValue = _serializer.deserialize(current);
+ }
+ T updateValue = _updater.update(currentValue);
+ return _serializer.serialize(updateValue);
+ }
+ catch (PropertyStoreException e)
+ {
+ LOG.error("Exception in update. Updater: " + _updater, e);
+ }
+ return null;
+ }
+ }
+
+ private volatile boolean _isConnected = false;
+ private volatile boolean _hasSessionExpired = false;
+
+ protected final ZkClient _zkClient;
+ protected PropertySerializer<T> _serializer;
+ protected final String _root;
+
+ // zookeeperPath->userCallbak->zkCallback
+ private final Map<String, Map<PropertyChangeListener<T>, ZkCallbackHandler<T>>> _callbackMap =
+ new HashMap<String, Map<PropertyChangeListener<T>, ZkCallbackHandler<T>>>();
+
+ // TODO cache capacity should be bounded
+ private final Map<String, PropertyItem> _cache =
+ new ConcurrentHashMap<String, PropertyItem>();
+
+ /**
+ * The given zkClient is assumed to serialize and deserialize raw byte[]
+ * for the given root and its descendants.
+ */
+ public ZKPropertyStore(ZkClient zkClient, final PropertySerializer<T> serializer,
+ String root)
+ {
+ if (zkClient == null || serializer == null || root == null)
+ {
+ throw new IllegalArgumentException("zkClient|serializer|root can't be null");
+ }
+
+ _root = normalizeKey(root);
+ _zkClient = zkClient;
+
+ setPropertySerializer(serializer);
+
+ _zkClient.createPersistent(_root, true);
+ _zkClient.subscribeStateChanges(this);
+ }
+
+ // key is normalized if it has exactly 1 leading slash
+ private String normalizeKey(String key)
+ {
+ if (key == null)
+ {
+ LOG.error("Key can't be null");
+ throw new IllegalArgumentException("Key can't be null");
+ }
+
+ // strip off leading slash
+ while (key.startsWith("/"))
+ {
+ key = key.substring(1);
+ }
+
+ return "/" + key;
+ }
+
+ private String getAbsolutePath(String key)
+ {
+ key = normalizeKey(key);
+ if (key.equals("/"))
+ {
+ return _root;
+ }
+ else
+ {
+ return _root + key;
+ }
+ }
+
+ // always a return normalized key
+ String getRelativePath(String path)
+ {
+ if (!path.startsWith(_root))
+ {
+ String errMsg = path + "does NOT start with property store's root: " + _root;
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+
+ if (path.equals(_root))
+ {
+ return "/";
+ }
+ else
+ {
+ return path.substring(_root.length());
+ }
+ }
+
+ @Override
+ public void createPropertyNamespace(String prefix) throws PropertyStoreException
+ {
+ String path = getAbsolutePath(prefix);
+ try
+ {
+ if (!_zkClient.exists(path))
+ {
+ _zkClient.createPersistent(path, true);
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception in creatPropertyNamespace(" + prefix + ")", e);
+ throw new PropertyStoreException(e.toString());
+ }
+ }
+
+ @Override
+ public void setProperty(String key, final T value) throws PropertyStoreException
+ {
+ String path = getAbsolutePath(key);
+
+ try
+ {
+ if (!_zkClient.exists(path))
+ {
+ _zkClient.createPersistent(path, true);
+ }
+
+ // serializer should handle value == null
+ byte[] valueBytes = _serializer.serialize(value);
+ _zkClient.writeData(path, valueBytes);
+
+ // update cache
+ // getProperty(key);
+
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception when setProperty(" + key + ", " + value + ")", e);
+ throw new PropertyStoreException(e.toString());
+ }
+ }
+
+ @Override
+ public T getProperty(String key) throws PropertyStoreException
+ {
+ return getProperty(key, null);
+ }
+
+ // bytes and stat are not null
+ private T getValueAndStat(byte[] bytes, Stat stat, PropertyStat propertyStat) throws PropertyStoreException
+ {
+ T value = _serializer.deserialize(bytes);
+
+ if (propertyStat != null)
+ {
+ propertyStat.setLastModifiedTime(stat.getMtime());
+ propertyStat.setVersion(stat.getVersion());
+ }
+ return value;
+ }
+
+ @Override
+ public T getProperty(String key, PropertyStat propertyStat) throws PropertyStoreException
+ {
+ String normalizedKey = normalizeKey(key);
+ String path = getAbsolutePath(normalizedKey);
+ Stat stat = new Stat();
+
+ T value = null;
+ try
+ {
+ synchronized (_cache)
+ {
+ PropertyItem item = _cache.get(normalizedKey);
+ _zkClient.subscribeDataChanges(path, this);
+ if (item != null)
+ {
+ // cache hit
+ stat = _zkClient.getStat(path);
+ if (stat != null)
+ {
+ if (item._stat.getCzxid() != stat.getCzxid()
+ || item.getVersion() < stat.getVersion())
+ {
+ // stale data in cache
+ byte[] bytes = _zkClient.readDataAndStat(path, stat, true);
+ if (bytes != null)
+ {
+ value = getValueAndStat(bytes, stat, propertyStat);
+ _cache.put(normalizedKey, new PropertyItem(bytes, stat));
+ }
+ }
+ else
+ {
+ // valid data in cache
+ // item.getBytes() should not be null
+ value = getValueAndStat(item.getBytes(), stat, propertyStat);
+ }
+ }
+ }
+ else
+ {
+ // cache miss
+ byte[] bytes = _zkClient.readDataAndStat(path, stat, true);
+ if (bytes != null)
+ {
+ value = getValueAndStat(bytes, stat, propertyStat);
+ _cache.put(normalizedKey, new PropertyItem(bytes, stat));
+ }
+ }
+ }
+ return value;
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception in getProperty(" + key + ")", e);
+ throw (new PropertyStoreException(e.toString()));
+ }
+ }
+
+ @Override
+ public void removeProperty(String key) throws PropertyStoreException
+ {
+ String normalizedKey = normalizeKey(key);
+ String path = getAbsolutePath(normalizedKey);
+
+ try
+ {
+ // if (_zkClient.exists(path))
+ // {
+ _zkClient.delete(path);
+ // }
+ // _cache.remove(normalizedKey);
+
+ }
+ catch (ZkNoNodeException e)
+ {
+ // OK
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception in removeProperty(" + key + ")", e);
+ throw (new PropertyStoreException(e.toString()));
+ }
+ }
+
+ @Override
+ public String getPropertyRootNamespace()
+ {
+ return _root;
+ }
+
+ @Override
+ public void removeNamespace(String prefix) throws PropertyStoreException
+ {
+ String path = getAbsolutePath(prefix);
+
+ try
+ {
+ // if (_zkClient.exists(path))
+ // {
+ _zkClient.deleteRecursive(path);
+ // }
+
+ // update cache
+ // childs are all normalized keys
+ // List<String> childs = getPropertyNames(prefix);
+ // for (String child : childs)
+ // {
+ // _cache.remove(child);
+ // }
+ }
+ catch (ZkNoNodeException e)
+ {
+ // OK
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception in removeProperty(" + prefix + ")", e);
+ throw (new PropertyStoreException(e.toString()));
+ }
+ }
+
+ // prefix is always normalized
+ private void doGetPropertyNames(String prefix, List<String> leafNodes) throws PropertyStoreException
+ {
+ String path = getAbsolutePath(prefix);
+
+ if (!_zkClient.exists(path))
+ {
+ return;
+ }
+
+ List<String> childs = _zkClient.getChildren(path);
+ if (childs == null)
+ {
+ return;
+ }
+
+ if (childs.size() == 0)
+ {
+ // add leaf node to cache
+ // getProperty(prefix);
+ leafNodes.add(prefix);
+ return;
+ }
+
+ for (String child : childs)
+ {
+ String childPath = prefix.equals("/") ? prefix + child : prefix + "/" + child;
+ doGetPropertyNames(childPath, leafNodes);
+ }
+ }
+
+ @Override
+ public List<String> getPropertyNames(String prefix) throws PropertyStoreException
+ {
+ String normalizedKey = normalizeKey(prefix);
+ List<String> propertyNames = new ArrayList<String>();
+ doGetPropertyNames(normalizedKey, propertyNames);
+
+ // sort it to get deterministic order
+ if (propertyNames.size() > 1)
+ {
+ Collections.sort(propertyNames);
+ }
+
+ return propertyNames;
+ }
+
+ @Override
+ public void setPropertyDelimiter(String delimiter) throws PropertyStoreException
+ {
+ throw new PropertyStoreException("setPropertyDelimiter() not implemented for ZKPropertyStore");
+ }
+
+ // put data/child listeners on prefix and all childs
+ @Override
+ public void subscribeForPropertyChange(String prefix,
+ final PropertyChangeListener<T> listener) throws PropertyStoreException
+ {
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("listener can't be null. Prefix: " + prefix);
+ }
+
+ String path = getAbsolutePath(prefix);
+
+ ZkCallbackHandler<T> callback = null;
+ synchronized (_callbackMap)
+ {
+ Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks;
+ if (!_callbackMap.containsKey(path))
+ {
+ _callbackMap.put(path,
+ new HashMap<PropertyChangeListener<T>, ZkCallbackHandler<T>>());
+ }
+ callbacks = _callbackMap.get(path);
+
+ if (!callbacks.containsKey(listener))
+ {
+ callback = new ZkCallbackHandler<T>(_zkClient, this, prefix, listener);
+ callbacks.put(listener, callback);
+ }
+ }
+
+ try
+ {
+ if (callback != null)
+ {
+ // a newly added callback
+ _zkClient.subscribeDataChanges(path, callback);
+ _zkClient.subscribeChildChanges(path, callback);
+
+ // do initial invocation
+ callback.handleChildChange(path, _zkClient.getChildren(path));
+
+ LOG.debug("Subscribed changes for " + path);
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception in subscribeForPropertyChange(" + prefix + ")", e);
+ throw (new PropertyStoreException(e.toString()));
+ }
+ }
+
+ // prefix is always a normalized key
+ private void doUnsubscribeForPropertyChange(String prefix, ZkCallbackHandler<T> callback)
+ {
+ String path = getAbsolutePath(prefix);
+
+ _zkClient.unsubscribeDataChanges(path, callback);
+ _zkClient.unsubscribeChildChanges(path, callback);
+
+ List<String> childs = _zkClient.getChildren(path);
+ if (childs == null || childs.size() == 0)
+ {
+ return;
+ }
+
+ for (String child : childs)
+ {
+ doUnsubscribeForPropertyChange(prefix + "/" + child, callback);
+ }
+ }
+
+ @Override
+ public void unsubscribeForPropertyChange(String prefix,
+ PropertyChangeListener<T> listener) throws PropertyStoreException
+ {
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("listener can't be null. Prefix: " + prefix);
+ }
+
+ String path = getAbsolutePath(prefix);
+ ZkCallbackHandler<T> callback = null;
+
+ synchronized (_callbackMap)
+ {
+ if (_callbackMap.containsKey(path))
+ {
+ Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks =
+ _callbackMap.get(path);
+ callback = callbacks.remove(listener);
+
+ if (callbacks == null || callbacks.isEmpty())
+ {
+ _callbackMap.remove(path);
+ }
+ }
+ }
+
+ if (callback != null)
+ {
+ doUnsubscribeForPropertyChange(prefix, callback);
+ LOG.debug("Unsubscribed changes for " + path);
+ }
+ }
+
+ @Override
+ public boolean canParentStoreData()
+ {
+ return false;
+ }
+
+ @Override
+ public void setPropertySerializer(final PropertySerializer<T> serializer)
+ {
+ if (serializer == null)
+ {
+ throw new IllegalArgumentException("serializer can't be null");
+ }
+
+ _serializer = serializer;
+ }
+
+ @Override
+ public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater) throws PropertyStoreException
+ {
+ updatePropertyUntilSucceed(key, updater, true);
+ }
+
+ @Override
+ public void updatePropertyUntilSucceed(String key,
+ DataUpdater<T> updater,
+ boolean createIfAbsent) throws PropertyStoreException
+ {
+ String path = getAbsolutePath(key);
+ try
+ {
+ if (!_zkClient.exists(path))
+ {
+ if (!createIfAbsent)
+ {
+ throw new PropertyStoreException("Can't update " + key
+ + " since no node exists");
+ }
+ else
+ {
+ _zkClient.createPersistent(path, true);
+ }
+ }
+
+ _zkClient.updateDataSerialized(path, new ByteArrayUpdater(updater, _serializer));
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception in updatePropertyUntilSucceed(" + key + ", " + createIfAbsent
+ + ")", e);
+ throw (new PropertyStoreException(e.toString()));
+ }
+
+ // update cache
+ // getProperty(key);
+ }
+
+ @Override
+ public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator)
+ {
+ return compareAndSet(key, expected, update, comparator, true);
+ }
+
+ @Override
+ public boolean compareAndSet(String key,
+ T expected,
+ T update,
+ Comparator<T> comparator,
+ boolean createIfAbsent)
+ {
+ String path = getAbsolutePath(key);
+
+ // if two threads call with createIfAbsent=true
+ // one thread creates the node, the other just goes through
+ // when wirteData() one thread writes the other gets ZkBadVersionException
+ if (!_zkClient.exists(path))
+ {
+ if (createIfAbsent)
+ {
+ _zkClient.createPersistent(path, true);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ try
+ {
+ Stat stat = new Stat();
+ byte[] currentBytes = _zkClient.readDataAndStat(path, stat, true);
+ T current = null;
+ if (currentBytes != null)
+ {
+ current = _serializer.deserialize(currentBytes);
+ }
+
+ if (comparator.compare(current, expected) == 0)
+ {
+ byte[] valueBytes = _serializer.serialize(update);
+ _zkClient.writeData(path, valueBytes, stat.getVersion());
+
+ // update cache
+ // getProperty(key);
+
+ return true;
+ }
+ }
+ catch (ZkBadVersionException e)
+ {
+ LOG.warn("Get BadVersion when writing to zookeeper. Mostly Ignorable due to contention");
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception when compareAndSet(" + key + ")", e);
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean exists(String key)
+ {
+ String path = getAbsolutePath(key);
+ return _zkClient.exists(path);
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception
+ {
+ LOG.info("KeeperState:" + state);
+ switch (state)
+ {
+ case SyncConnected:
+ _isConnected = true;
+ break;
+ case Disconnected:
+ _isConnected = false;
+ break;
+ case Expired:
+ _isConnected = false;
+ _hasSessionExpired = true;
+ break;
+ }
+ }
+
+ @Override
+ public void handleNewSession() throws Exception
+ {
+ ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+ ZooKeeper zookeeper = connection.getZookeeper();
+ LOG.info("handleNewSession: " + zookeeper.getSessionId());
+
+ synchronized (_callbackMap)
+ {
+ for (String path : _callbackMap.keySet())
+ {
+ Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks =
+ _callbackMap.get(path);
+ if (callbacks == null || callbacks.size() == 0)
+ {
+ LOG.error("Get a null callback map. Remove it. Path: " + path);
+ _callbackMap.remove(path);
+ continue;
+ }
+
+ for (PropertyChangeListener<T> listener : callbacks.keySet())
+ {
+ ZkCallbackHandler<T> callback = callbacks.get(listener);
+
+ if (callback == null)
+ {
+ LOG.error("Get a null callback. Remove it. Path: " + path + ", listener: "
+ + listener);
+ callbacks.remove(listener);
+ continue;
+ }
+ _zkClient.subscribeDataChanges(path, callback);
+ _zkClient.subscribeChildChanges(path, callback);
+
+ // do initial invocation
+ callback.handleChildChange(path, _zkClient.getChildren(path));
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean start()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean stop()
+ {
+ _zkClient.close();
+ return true;
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception
+ {
+ // TODO Auto-generated method stub
+ String key = getRelativePath(dataPath);
+ synchronized (_cache)
+ {
+ _zkClient.unsubscribeDataChanges(dataPath, this);
+ _cache.remove(key);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZNode.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZNode.java
new file mode 100644
index 0000000..481b5d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZNode.java
@@ -0,0 +1,110 @@
+package org.apache.helix.store.zk;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.zookeeper.data.Stat;
+
+public class ZNode
+{
+ // used for a newly created item, because zkclient.create() doesn't return stat
+ // or used for places where we don't care about stat
+ public static final Stat ZERO_STAT = new Stat();
+
+ final String _zkPath;
+ private Stat _stat;
+ Object _data;
+ Set<String> _childSet;
+
+ public ZNode(String zkPath, Object data, Stat stat)
+ {
+ _zkPath = zkPath;
+ _childSet = Collections.<String>emptySet(); // new HashSet<String>();
+ _data = data;
+ _stat = stat;
+ }
+
+ public void removeChild(String child)
+ {
+ if (_childSet != Collections.<String>emptySet())
+ {
+ _childSet.remove(child);
+ }
+ }
+
+ public void addChild(String child)
+ {
+ if (_childSet == Collections.<String>emptySet())
+ {
+ _childSet = new HashSet<String>();
+ }
+
+ _childSet.add(child);
+ }
+
+ public void addChildren(List<String> children)
+ {
+ if (children != null && !children.isEmpty())
+ {
+ if (_childSet == Collections.<String>emptySet())
+ {
+ _childSet = new HashSet<String>();
+ }
+
+ _childSet.addAll(children);
+ }
+ }
+
+ public boolean hasChild(String child)
+ {
+ return _childSet.contains(child);
+ }
+
+ public Set<String> getChildSet()
+ {
+ return _childSet;
+ }
+
+ public void setData(Object data)
+ {
+// System.out.println("setData: " + _zkPath + ", data: " + data);
+ _data= data;
+ }
+
+ public Object getData()
+ {
+ return _data;
+ }
+
+ public void setStat(Stat stat)
+ {
+ _stat = stat;
+ }
+
+ public Stat getStat()
+ {
+ return _stat;
+ }
+
+ public void setChildSet(List<String> childNames)
+ {
+ if (childNames != null && !childNames.isEmpty())
+ {
+ if (_childSet == Collections.<String>emptySet())
+ {
+ _childSet = new HashSet<String>();
+ }
+
+ _childSet.clear();
+ _childSet.addAll(childNames);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return _zkPath + ", " + _data + ", " + _childSet + ", " + _stat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZkCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZkCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZkCallbackHandler.java
new file mode 100644
index 0000000..30b1feb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZkCallbackHandler.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.log4j.Logger;
+
+
+class ZkCallbackHandler<T> implements IZkChildListener, IZkDataListener
+{
+ private static Logger LOG = Logger.getLogger(ZkCallbackHandler.class);
+
+ private final ZkClient _zkClient;
+ private final ZKPropertyStore<T> _store;
+
+ // listen on prefix and all its childs
+ private final String _prefix;
+ private final PropertyChangeListener<T> _listener;
+
+ public ZkCallbackHandler(ZkClient client, ZKPropertyStore<T> store, String prefix,
+ PropertyChangeListener<T> listener)
+ {
+ _zkClient = client;
+ _store = store;
+ _prefix = prefix;
+ _listener = listener;
+ }
+
+ @Override
+ public void handleDataChange(String path, Object data) throws Exception
+ {
+ LOG.debug("Data changed @ " + path + " to " + data);
+ String key = _store.getRelativePath(path);
+ _listener.onPropertyChange(key);
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception
+ {
+ LOG.debug("Data deleted @ " + dataPath);
+ }
+
+ @Override
+ public void handleChildChange(String path, List<String> currentChilds) throws Exception
+ {
+ LOG.debug("childs changed @ " + path + " to " + currentChilds);
+ // System.out.println("childs changed @ " + path + " to " + currentChilds);
+
+
+ if (currentChilds == null)
+ {
+ /**
+ * When a node with a child change watcher is deleted
+ * a child change is triggered on the deleted node
+ * and in this case, the currentChilds is null
+ */
+ return;
+// } else if (currentChilds.size() == 0)
+// {
+// String key = _store.getRelativePath(path);
+// _listener.onPropertyChange(key);
+ }
+ else
+ {
+ String key = _store.getRelativePath(path);
+ _listener.onPropertyChange(key);
+
+ for (String child : currentChilds)
+ {
+ String childPath = path.endsWith("/") ? path + child : path + "/" + child;
+ _zkClient.subscribeDataChanges(childPath, this);
+ _zkClient.subscribeChildChanges(childPath, this);
+
+ // recursive call
+ handleChildChange(childPath, _zkClient.getChildren(childPath));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
new file mode 100644
index 0000000..56e636a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
@@ -0,0 +1,31 @@
+package org.apache.helix.store.zk;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+
+
+public class ZkHelixPropertyStore<T> extends ZkCacheBaseDataAccessor<T>
+{
+ public ZkHelixPropertyStore(ZkBaseDataAccessor<T> accessor,
+ String root,
+ List<String> subscribedPaths)
+ {
+ super(accessor, root, null, subscribedPaths);
+ }
+
+ public ZkHelixPropertyStore(String zkAddress,
+ ZkSerializer serializer,
+ String chrootPath,
+ List<String> zkCachePaths)
+ {
+ super(zkAddress, serializer, chrootPath, null, zkCachePaths);
+ }
+
+ public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath)
+ {
+ super(zkAddress, serializer, chrootPath, null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZkListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZkListener.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZkListener.java
new file mode 100644
index 0000000..3a561a4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZkListener.java
@@ -0,0 +1,10 @@
+package org.apache.helix.store.zk;
+
+public interface ZkListener
+{
+ void handleDataChange(String path);
+
+ void handleNodeCreate(String path);
+
+ void handleNodeDelete(String path);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/package-info.java b/helix-core/src/main/java/org/apache/helix/store/zk/package-info.java
new file mode 100644
index 0000000..0675eac
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix implementation of zookeeper-based property store (Deprecated)
+ *
+ */
+package org.apache.helix.store.zk;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java b/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
new file mode 100644
index 0000000..125c877
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.spi.ErrorCode;
+
+public class CLMLogFileAppender extends FileAppender
+{
+ public CLMLogFileAppender()
+ {
+ }
+
+ public CLMLogFileAppender(Layout layout, String filename, boolean append,
+ boolean bufferedIO, int bufferSize) throws IOException
+ {
+ super(layout, filename, append, bufferedIO, bufferSize);
+ }
+
+ public CLMLogFileAppender(Layout layout, String filename, boolean append)
+ throws IOException
+ {
+ super(layout, filename, append);
+ }
+
+ public CLMLogFileAppender(Layout layout, String filename) throws IOException
+ {
+ super(layout, filename);
+ }
+
+ public void activateOptions()
+ {
+ if (fileName != null)
+ {
+ try
+ {
+ fileName = getNewLogFileName();
+ setFile(fileName, fileAppend, bufferedIO, bufferSize);
+ } catch (Exception e)
+ {
+ errorHandler.error("Error while activating log options", e,
+ ErrorCode.FILE_OPEN_FAILURE);
+ }
+ }
+ }
+
+ private String getNewLogFileName()
+ {
+ Calendar cal = Calendar.getInstance();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
+ String time = sdf.format(cal.getTime());
+
+ StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ StackTraceElement main = stack[stack.length - 1];
+ String mainClass = main.getClassName();
+
+ return System.getProperty("user.home") + "/EspressoLogs/" + mainClass + "_"
+ + time + ".txt";
+ }
+}