You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[23/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/ZNRecordJsonSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/ZNRecordJsonSerializer.java b/helix-core/src/main/java/com/linkedin/helix/store/ZNRecordJsonSerializer.java
deleted file mode 100644
index 0803877..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/ZNRecordJsonSerializer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-
-public class ZNRecordJsonSerializer implements PropertySerializer<ZNRecord>
-{
- static private Logger LOG = Logger.getLogger(ZNRecordJsonSerializer.class);
- private final ZNRecordSerializer _serializer = new ZNRecordSerializer();
-
- @Override
- public byte[] serialize(ZNRecord data) throws PropertyStoreException
- {
- return _serializer.serialize(data);
- }
-
- @Override
- public ZNRecord deserialize(byte[] bytes) throws PropertyStoreException
- {
- return (ZNRecord) _serializer.deserialize(bytes);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/file/FileHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/file/FileHelixPropertyStore.java b/helix-core/src/main/java/com/linkedin/helix/store/file/FileHelixPropertyStore.java
deleted file mode 100644
index 7b3c959..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/file/FileHelixPropertyStore.java
+++ /dev/null
@@ -1,293 +0,0 @@
-package com.linkedin.helix.store.file;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.store.HelixPropertyListener;
-import com.linkedin.helix.store.HelixPropertyStore;
-import com.linkedin.helix.store.PropertyChangeListener;
-import com.linkedin.helix.store.PropertyJsonComparator;
-import com.linkedin.helix.store.PropertySerializer;
-import com.linkedin.helix.store.PropertyStat;
-import com.linkedin.helix.store.PropertyStoreException;
-
-public class FileHelixPropertyStore<T> implements HelixPropertyStore<T>
-{
- final FilePropertyStore<T> _store;
-
- public FileHelixPropertyStore(final PropertySerializer<T> serializer,
- String rootNamespace,
- final PropertyJsonComparator<T> comparator)
- {
- _store = new FilePropertyStore<T>(serializer, rootNamespace, comparator);
- }
-
- @Override
- public boolean create(String path, T record, int options)
- {
- return set(path, record, options);
- }
-
- @Override
- public boolean set(String path, T record, int options)
- {
- try
- {
- _store.setProperty(path, record);
- return true;
- }
- catch (PropertyStoreException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return false;
- }
-
- @Override
- public boolean update(String path, DataUpdater<T> updater, int options)
- {
- _store.updatePropertyUntilSucceed(path, updater);
- return true;
- }
-
- @Override
- public boolean remove(String path, int options)
- {
- try
- {
- _store.removeProperty(path);
- return true;
- }
- catch (PropertyStoreException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return false;
- }
-
- @Override
- public boolean[] createChildren(List<String> paths, List<T> records, int options)
- {
- return setChildren(paths, records, options);
- }
-
- @Override
- public boolean[] setChildren(List<String> paths, List<T> records, int options)
- {
- boolean[] success = new boolean[paths.size()];
- for (int i = 0; i < paths.size(); i++)
- {
- success[i] = create(paths.get(i), records.get(i), options);
- }
- return success;
- }
-
- @Override
- public boolean[] updateChildren(List<String> paths,
- List<DataUpdater<T>> updaters,
- int options)
- {
- boolean[] success = new boolean[paths.size()];
- for (int i = 0; i < paths.size(); i++)
- {
- success[i] = update(paths.get(i), updaters.get(i), options);
- }
- return success;
-
- }
-
- @Override
- public boolean[] remove(List<String> paths, int options)
- {
- boolean[] success = new boolean[paths.size()];
- for (int i = 0; i < paths.size(); i++)
- {
- success[i] = remove(paths.get(i), options);
- }
- return success;
- }
-
- @Override
- public T get(String path, Stat stat, int options)
- {
- PropertyStat propertyStat = new PropertyStat();
- try
- {
- T value = _store.getProperty(path, propertyStat);
- if (stat != null)
- {
- stat.setVersion(propertyStat.getVersion());
- stat.setMtime(propertyStat.getLastModifiedTime());
- }
- return value;
- }
- catch (PropertyStoreException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return null;
- }
-
- @Override
- public List<T> get(List<String> paths, List<Stat> stats, int options)
- {
- List<T> values = new ArrayList<T>();
- for (int i = 0; i < paths.size(); i++)
- {
- values.add(get(paths.get(i), stats.get(i), options));
- }
- return values;
- }
-
- @Override
- public List<T> getChildren(String parentPath, List<Stat> stats, int options)
- {
- List<String> childNames = getChildNames(parentPath, options);
-
- if (childNames == null)
- {
- return null;
- }
-
- List<String> paths = new ArrayList<String>();
- for (String childName : childNames)
- {
- String path = parentPath + "/" + childName;
- paths.add(path);
- }
-
- return get(paths, stats, options);
- }
-
- @Override
- public List<String> getChildNames(String parentPath, int options)
- {
- try
- {
- return _store.getPropertyNames(parentPath);
- }
- catch (PropertyStoreException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return Collections.emptyList();
- }
-
- @Override
- public boolean exists(String path, int options)
- {
- return _store.exists(path);
- }
-
- @Override
- public boolean[] exists(List<String> paths, int options)
- {
- boolean[] exists = new boolean[paths.size()];
- for (int i = 0; i < paths.size(); i++)
- {
- exists[i] = exists(paths.get(i), options);
- }
- return exists;
- }
-
- @Override
- public Stat[] getStats(List<String> paths, int options)
- {
- Stat[] stats = new Stat[paths.size()];
- for (int i = 0; i < paths.size(); i++)
- {
- stats[i] = getStat(paths.get(i), options);
- }
- return stats;
- }
-
- @Override
- public Stat getStat(String path, int options)
- {
- Stat stat = new Stat();
- get(path, stat, options);
- return stat;
- }
-
- @Override
- public void subscribeDataChanges(String path, IZkDataListener listener)
- {
- throw new UnsupportedOperationException("subscribeDataChanges not supported");
- }
-
- @Override
- public void unsubscribeDataChanges(String path, IZkDataListener listener)
- {
- throw new UnsupportedOperationException("unsubscribeDataChanges not supported");
- }
-
- @Override
- public List<String> subscribeChildChanges(String path, IZkChildListener listener)
- {
- throw new UnsupportedOperationException("subscribeChildChanges not supported");
- }
-
- @Override
- public void unsubscribeChildChanges(String path, IZkChildListener listener)
- {
- throw new UnsupportedOperationException("unsubscribeChildChanges not supported");
- }
-
- @Override
- public void start()
- {
- _store.start();
- }
-
- @Override
- public void stop()
- {
- _store.stop();
- }
-
- @Override
- public void subscribe(String parentPath, final HelixPropertyListener listener)
- {
- try
- {
- _store.subscribeForPropertyChange(parentPath, new PropertyChangeListener<T>()
- {
-
- @Override
- public void onPropertyChange(String key)
- {
- listener.onDataChange(key);
- }
- });
- }
- catch (PropertyStoreException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- @Override
- public void unsubscribe(String parentPath, HelixPropertyListener listener)
- {
- throw new UnsupportedOperationException("unsubscribe not supported");
- }
-
- @Override
- public void reset()
- {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/file/FilePropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/file/FilePropertyStore.java b/helix-core/src/main/java/com/linkedin/helix/store/file/FilePropertyStore.java
deleted file mode 100644
index 0ee1a29..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/file/FilePropertyStore.java
+++ /dev/null
@@ -1,942 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store.file;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.commons.io.DirectoryWalker;
-import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.manager.file.FileCallbackHandler;
-import com.linkedin.helix.store.PropertyChangeListener;
-import com.linkedin.helix.store.PropertyJsonComparator;
-import com.linkedin.helix.store.PropertySerializer;
-import com.linkedin.helix.store.PropertyStat;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.PropertyStoreException;
-
-/**
- *
- * property store that built upon a file system
- * since file systems usually have sophisticated cache mechanisms
- * there is no need for another cache for file property store
- *
- * NOTES:
- * lastModified timestamp provided by java file io has only second level precision
- * so it is possible that files have been modified without changing its lastModified timestamp
- * the solution is to use a map that caches the files changed in last second
- * and in the next round of refresh, check against this map to figure out whether a file
- * has been changed/created in the last second
- * @link http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6939260
- *
- * @author zzhang
- * @param <T>
- */
-
-public class FilePropertyStore<T> implements PropertyStore<T>
-{
- private static Logger logger = Logger.getLogger(FilePropertyStore.class);
-
- private final String ROOT = "/";
- private final long TIMEOUT = 30L;
- private final long REFRESH_PERIOD = 1000; // ms
- private final int _id = new Random().nextInt();
- private final String _rootNamespace;
- private PropertySerializer<T> _serializer;
- private final PropertyJsonComparator<T> _comparator;
-
- private Thread _refreshThread;
- private final AtomicBoolean _stopRefreshThread;
- private final CountDownLatch _firstRefreshCounter;
- private final ReadWriteLock _readWriteLock;
-
- private Map<String, T> _lastModifiedFiles = new HashMap<String, T>();
- private Map<String, T> _curModifiedFiles = new HashMap<String, T>();
-
- private final Map< String, CopyOnWriteArraySet<PropertyChangeListener<T> > > _fileChangeListeners; // map key to listener
-
- private class FilePropertyStoreRefreshThread implements Runnable
- {
- private final PropertyStoreDirWalker _dirWalker;
-
- public class PropertyStoreDirWalker extends DirectoryWalker
- {
- private final File _propertyStoreRootDir;
- private long _lastNotifiedTime = 0;
- private long _currentHighWatermark;
-
- public PropertyStoreDirWalker(String rootNamespace, ReadWriteLock readWriteLock)
- {
- _propertyStoreRootDir = new File(rootNamespace);
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- protected void handleFile(File file, int depth, Collection results) throws IOException
- {
- if (file.lastModified() < _lastNotifiedTime)
- {
- return;
- }
-
- String path = getRelativePath(file.getAbsolutePath());
- T newValue = null;
- try
- {
- newValue = getProperty(path);
- } catch (PropertyStoreException e)
- {
- logger.error("fail to get property, path:" + path, e);
- }
-
- if (file.lastModified() == _lastNotifiedTime && _lastModifiedFiles.containsKey(path))
- {
- T value = _lastModifiedFiles.get(path);
-
- if (_comparator.compare(value, newValue) == 0)
- {
- if (file.lastModified() == _currentHighWatermark)
- {
- _curModifiedFiles.put(path, newValue);
- }
- return;
- }
- }
-
- if (file.lastModified() > _currentHighWatermark)
- {
- _currentHighWatermark = file.lastModified();
-
- _curModifiedFiles.clear();
- _curModifiedFiles.put(path, newValue);
- }
- else if (file.lastModified() == _currentHighWatermark)
- {
- _curModifiedFiles.put(path, newValue);
- }
-
- // debug
-// logger.error("file: " + file.getAbsolutePath() + " changed@" + file.lastModified() + " (" +
-// new Date(file.lastModified()) + ")");
- results.add(file);
- }
-
- @Override
- protected boolean handleDirectory(File dir, int depth, Collection results) throws IOException
- {
- if (dir.lastModified() < _lastNotifiedTime)
- {
- return true;
- }
-
- String path = getRelativePath(dir.getAbsolutePath());
- T newValue = null;
- try
- {
- newValue = getProperty(path);
- }
- catch (PropertyStoreException e)
- {
- logger.error("fail to get property, path:" + path, e);
- }
-
- if (dir.lastModified() == _lastNotifiedTime && _lastModifiedFiles.containsKey(path))
- {
- T value = _lastModifiedFiles.get(path);
- if (_comparator.compare(value, newValue) == 0)
- {
- if (dir.lastModified() == _currentHighWatermark)
- {
- _curModifiedFiles.put(path, newValue);
- }
- return true;
- }
- }
- _curModifiedFiles.put(path, newValue);
-
- if (dir.lastModified() > _currentHighWatermark)
- {
- _currentHighWatermark = dir.lastModified();
-
- _curModifiedFiles.clear();
- _curModifiedFiles.put(path, newValue);
- }
- else if (dir.lastModified() == _currentHighWatermark)
- {
- _curModifiedFiles.put(path, newValue);
- }
-
- logger.debug("dir: " + dir.getAbsolutePath() + " changed@" + dir.lastModified() +
- " (" + new Date(dir.lastModified()) + ")");
- results.add(dir);
-
- return true;
- }
-
- public void walk()
- {
- HashSet<File> files = new HashSet<File>();
-
- try
- {
- _currentHighWatermark = _lastNotifiedTime;
- _readWriteLock.readLock().lock();
- super.walk(_propertyStoreRootDir, files);
- }
- catch (IOException e)
- {
- logger.error("IO exception when walking through dir:" + _propertyStoreRootDir, e);
- }
- finally
- {
- _lastNotifiedTime = _currentHighWatermark;
- _lastModifiedFiles.clear();
-
- Map<String, T> temp = _lastModifiedFiles;
- _lastModifiedFiles = _curModifiedFiles;
- _curModifiedFiles = temp;
- _readWriteLock.readLock().unlock();
- }
-
- // TODO see if we can use DirectoryFileComparator.DIRECTORY_COMPARATOR.sort()
- File[] fileArray = new File[files.size()];
- fileArray = files.toArray(fileArray);
- Arrays.sort(fileArray, new Comparator<File>() {
-
- @Override
- public int compare(File file1, File file2)
- {
- return file1.getAbsoluteFile().compareTo(file2.getAbsoluteFile());
- }
-
- });
-
-
- // notify listeners
- for (int i = 0; i < fileArray.length; i++)
- {
- File file = fileArray[i];
-
- // debug
-// logger.error("Before send notification of " + file.getAbsolutePath() + " to listeners " + _fileChangeListeners);
-
- for (Map.Entry< String, CopyOnWriteArraySet<PropertyChangeListener<T> > > entry : _fileChangeListeners.entrySet())
- {
- String absPath = file.getAbsolutePath();
- if (absPath.startsWith(entry.getKey()))
- {
- for (PropertyChangeListener<T> listener : entry.getValue())
- {
- if (listener instanceof FileCallbackHandler)
- {
- FileCallbackHandler handler = (FileCallbackHandler) listener;
-
- // debug
-// logger.error("Send notification of " + file.getAbsolutePath() + " to listener:" + handler.getListener());
- }
- listener.onPropertyChange(getRelativePath(absPath));
- }
- }
- }
- }
- }
- }
-
- public FilePropertyStoreRefreshThread(ReadWriteLock readWriteLock)
- {
- _dirWalker = new PropertyStoreDirWalker(_rootNamespace, readWriteLock);
- }
-
- @Override
- public void run()
- {
- while (!_stopRefreshThread.get())
- {
- _dirWalker.walk();
- _firstRefreshCounter.countDown();
-
- try
- {
- Thread.sleep(REFRESH_PERIOD);
-// System.out.println("refresh thread is running");
- }
- catch (InterruptedException ie)
- {
- // do nothing
- }
- }
-
- logger.info("Quitting file property store refresh thread");
-
- }
-
- }
-
-// public FilePropertyStore(final PropertySerializer<T> serializer)
-// {
-// this(serializer, System.getProperty("java.io.tmpdir"));
-// }
-
- public FilePropertyStore(final PropertySerializer<T> serializer, String rootNamespace,
- final PropertyJsonComparator<T> comparator)
- {
- _serializer = serializer;
- _comparator = comparator;
- _stopRefreshThread = new AtomicBoolean(false);
- _firstRefreshCounter = new CountDownLatch(1);
- _readWriteLock = new ReentrantReadWriteLock();
-
- _fileChangeListeners = new ConcurrentHashMap< String, CopyOnWriteArraySet<PropertyChangeListener<T> > >();
-
- // Strip off leading slash
- while (rootNamespace.startsWith("/"))
- {
- // rootNamespace = rootNamespace.substring(1, rootNamespace.length());
- rootNamespace = rootNamespace.substring(1);
- }
- _rootNamespace = "/" + rootNamespace;
-
- this.createRootNamespace();
- }
-
-
- @Override
- public boolean start()
- {
- // check if start has already been invoked
- if (_firstRefreshCounter.getCount() == 0)
- return true;
-
- logger.debug("starting file property store polling thread, id=" + _id);
-
- _stopRefreshThread.set(false);
- _refreshThread = new Thread(new FilePropertyStoreRefreshThread(_readWriteLock),
- "FileRefreshThread_" + _id);
- // _refreshThread.setDaemon(true);
- _refreshThread.start();
-
- try
- {
- boolean timeout = !_firstRefreshCounter.await(TIMEOUT, TimeUnit.SECONDS);
- if (timeout)
- {
- throw new Exception("Timeout while waiting for the first refresh to complete");
- }
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
-
- return true;
- }
-
- @Override
- public boolean stop()
- {
- if (_stopRefreshThread.compareAndSet(false, true))
- {
- // _stopRefreshThread.set(true);
- if (_refreshThread != null)
- {
- try
- {
- _refreshThread.join();
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- return true;
- }
-
- private String getPath(String key)
- {
- // Strip off leading slash
- while (key.startsWith("/"))
- {
- // key = key.substring(1, key.length());
- key = key.substring(1);
- }
-
- // String path = key.equals(ROOT) ? _rootNamespace : (_rootNamespace + "/" + key);
- String path = key.equals("") ? _rootNamespace : (_rootNamespace + "/" + key);
- return path;
- }
-
- private String getRelativePath(String path)
- {
- // strip off rootPath from path
- if (!path.startsWith(_rootNamespace))
- {
- logger.warn("path does NOT start with: " + _rootNamespace);
- return path;
- }
-
- if (path.equals(_rootNamespace))
- return ROOT;
-
- // path = path.substring(_rootNamespace.length() + 1);
- path = path.substring(_rootNamespace.length());
-
- return path;
- }
-
- public void createRootNamespace()
- {
- createPropertyNamespace(ROOT);
- }
-
- @Override
- public void createPropertyNamespace(String prefix)
- {
- String path = getPath(prefix);
- File dir = new File(path);
- try
- {
- _readWriteLock.writeLock().lock();
- if (dir.exists())
- {
- logger.warn(path + " already exists");
- }
- else
- {
- if (!dir.mkdirs())
- {
- logger.warn("Failed to create: " + path);
- }
- }
- }
- catch (Exception e)
- {
- logger.error("Failed to create dir: " + path + "\nexception:" + e);
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- }
-
- }
-
- @Override
- public void setProperty(String key, T value) throws PropertyStoreException
- {
- String path = getPath(key);
- File file = new File(path); // null;
- // FileLock fLock = null;
- FileOutputStream fout = null;
-
- // TODO create non-exist dirs recursively
- try
- {
- _readWriteLock.writeLock().lock();
- // file = new File(path);
- if (!file.exists())
- {
- FileUtils.touch(file);
- }
-
- fout = new FileOutputStream(file);
- // FileChannel fChannel = fout.getChannel();
-
- // TODO need a timeout on lock operation
- // fLock = fChannel.lock();
-
- byte[] bytes = _serializer.serialize(value);
- fout.write(bytes);
- }
-// catch (FileNotFoundException e)
-// {
-// logger.error("fail to set property, key:" + key +
-// "\nfile not found exception:" + e);
-// }
- catch (IOException e)
- {
- logger.error("fail to set property. key:" + key +
- "value:" + value, e);
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- try
- {
- // if (fLock != null && fLock.isValid())
- // fLock.release();
-
- if (fout != null)
- {
- fout.close();
- }
- }
- catch (IOException e)
- {
- logger.error("fail to close file. key:" + key, e);
- }
-
- }
-
- }
-
- @Override
- public T getProperty(String key) throws PropertyStoreException
- {
- return this.getProperty(key, null);
- }
-
- @Override
- public T getProperty(String key, PropertyStat propertyStat) throws PropertyStoreException
- {
-
- String path = getPath(key);
- File file = null;
- // FileLock fLock = null;
- FileInputStream fin = null;
-
- try
- {
- // TODO need a timeout on lock operation
- _readWriteLock.readLock().lock();
-
- file = new File(path);
- if (!file.exists())
- {
- return null;
- }
-
- fin = new FileInputStream(file);
- // FileChannel fChannel = fin.getChannel();
- // fLock = fChannel.lock(0L, Long.MAX_VALUE, true);
-
- int availableBytes = fin.available();
- if (availableBytes == 0)
- {
- return null;
- }
-
- byte[] bytes = new byte[availableBytes];
- fin.read(bytes);
-
- if (propertyStat != null)
- {
- propertyStat.setLastModifiedTime(file.lastModified());
- }
-
- return _serializer.deserialize(bytes);
- }
- catch (FileNotFoundException e)
- {
- return null;
- }
- catch (IOException e)
- {
- logger.error("fail to get property. key:" + key, e);
- }
- finally
- {
- _readWriteLock.readLock().unlock();
- try
- {
- // if (fLock != null && fLock.isValid())
- // fLock.release();
- if (fin != null)
- {
- fin.close();
- }
- }
- catch (IOException e)
- {
- logger.error("fail to close file. key:" + key, e);
- }
-
- }
-
- return null;
- }
-
- @Override
- public void removeProperty(String key) throws PropertyStoreException
- {
- String path = getPath(key);
- File file = new File(path);
-
- try
- {
- _readWriteLock.writeLock().lock();
- if (!file.exists())
- {
- return;
- }
-
- boolean success = file.delete();
- if (!success)
- {
- logger.error("fail to remove file. path:" + path);
- }
- }
- catch (Exception e)
- {
- logger.error("fail to remove file. path:" + path, e);
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- }
-
-
- }
-
- public void removeRootNamespace() throws PropertyStoreException
- {
- removeNamespace(ROOT);
- }
-
- @Override
- public void removeNamespace(String prefix) throws PropertyStoreException
- {
- String path = getPath(prefix);
-
- try
- {
- _readWriteLock.writeLock().lock();
- FileUtils.deleteDirectory(new File(path));
- }
- catch (IOException e)
- {
- logger.error("fail to remove namespace, path:" + path, e);
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- }
- }
-
- private void doGetPropertyNames(String path, List<String> leafNodes)
- throws PropertyStoreException
- {
- File file = new File(path);
- if (!file.exists())
- {
- return;
- }
-
- // List<String> childs = _zkClient.getChildren(path);
- if (file.isDirectory())
- {
- String[] childs = file.list();
- if (childs == null || childs.length == 0)
- {
- return;
- }
- for (String child : childs)
- {
- String pathToChild = path + "/" + child;
- doGetPropertyNames(pathToChild, leafNodes);
- }
- }
- else if (file.isFile())
- {
- // getProperty(getRelativePath(path));
- leafNodes.add(getRelativePath(path));
- return;
- }
- }
-
-
- @Override
- public List<String> getPropertyNames(String prefix) throws PropertyStoreException
- {
- String path = getPath(prefix);
- List<String> propertyNames = new ArrayList<String>();
-
- try
- {
- _readWriteLock.readLock().lock();
- doGetPropertyNames(path, propertyNames);
- }
- finally
- {
- _readWriteLock.readLock().unlock();
- }
-
- // sort it to get deterministic order
- Collections.sort(propertyNames);
-
- return propertyNames;
- }
-
- @Override
- public void setPropertyDelimiter(String delimiter) throws PropertyStoreException
- {
- throw new UnsupportedOperationException(
- "setPropertyDelimiter() is NOT supported by FilePropertyStore");
- }
-
- @Override
- public void subscribeForPropertyChange(String prefix, PropertyChangeListener<T> listener)
- throws PropertyStoreException
- {
- if (null != listener)
- {
- String path = getPath(prefix);
- synchronized (_fileChangeListeners)
- {
- CopyOnWriteArraySet<PropertyChangeListener <T> > listeners = _fileChangeListeners.get(path);
- if (listeners == null) {
- listeners = new CopyOnWriteArraySet<PropertyChangeListener <T> >();
- _fileChangeListeners.put(path, listeners);
- }
- listeners.add(listener);
- }
- }
-
- }
-
- @Override
- public void unsubscribeForPropertyChange(String prefix, PropertyChangeListener<T> listener)
- throws PropertyStoreException
- {
- if (null != listener)
- {
- String path = getPath(prefix);
- synchronized (_fileChangeListeners)
- {
- final Set<PropertyChangeListener<T> > listeners = _fileChangeListeners.get(path);
- if (listeners != null)
- {
- listeners.remove(listener);
- }
- if (listeners == null || listeners.isEmpty())
- {
- _fileChangeListeners.remove(path);
- }
- }
- }
-
- }
-
- @Override
- public boolean canParentStoreData()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public String getPropertyRootNamespace()
- {
- return _rootNamespace;
- }
-
- @Override
- public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater)
- {
- updatePropertyUntilSucceed(key, updater, true);
- }
-
- @Override
- public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater,
- boolean createIfAbsent)
- {
- String path = getPath(key);
- File file = new File(path);
- RandomAccessFile raFile = null;
- FileLock fLock = null;
-
- try
- {
- _readWriteLock.writeLock().lock();
- if (!file.exists())
- {
- FileUtils.touch(file);
- }
-
- raFile = new RandomAccessFile(file, "rw");
- FileChannel fChannel = raFile.getChannel();
- fLock = fChannel.lock();
-
- T current = getProperty(key);
- T update = updater.update(current);
- setProperty(key, update);
- }
- catch (Exception e)
- {
- logger.error("fail to updatePropertyUntilSucceed, path:" + path, e);
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- try
- {
- if (fLock != null && fLock.isValid())
- {
- fLock.release();
- }
-
- if (raFile != null)
- {
- raFile.close();
- }
- }
- catch (IOException e)
- {
- logger.error("fail to close file, path:" + path, e);
- }
- }
- }
-
-
- @Override
- public void setPropertySerializer(PropertySerializer<T> serializer)
- {
- _readWriteLock.writeLock().lock();
- _serializer = serializer;
- _readWriteLock.writeLock().unlock();
- }
-
- @Override
- public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator)
- {
- return compareAndSet(key, expected, update, comparator, false);
- }
-
- @Override
- public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator,
- boolean createIfAbsent)
- {
- String path = getPath(key);
- File file = new File(path);
-// FileInputStream fin = null;
-// FileOutputStream fout = null;
- RandomAccessFile raFile = null;
- FileLock fLock = null;
-
- try
- {
- _readWriteLock.writeLock().lock();
-
- if (createIfAbsent)
- {
- file.createNewFile();
- }
-
-// fin = new FileInputStream(file);
-// FileChannel fChannel = fin.getChannel();
- raFile = new RandomAccessFile(file, "rw");
- FileChannel fChannel = raFile.getChannel();
- fLock = fChannel.lock();
-
- T current = getProperty(key);
- if (comparator.compare(current, expected) == 0)
- {
-// fout = new FileOutputStream(file);
-//
-// byte[] bytes = _serializer.serialize(update);
-// fout.write(bytes);
- setProperty(key, update);
- return true;
- }
-
- return false;
- }
- catch (FileNotFoundException e)
- {
- logger.error("fail to compareAndSet. path:" + path, e);
- return false;
- }
- catch (Exception e)
- {
- logger.error("fail to compareAndSet. path:" + path, e);
- return false;
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- try
- {
- if (fLock != null && fLock.isValid())
- {
- fLock.release();
- }
-
- if (raFile != null)
- {
- raFile.close();
- }
-
-// if (fin != null)
-// {
-// fin.close();
-// }
-//
-// if (fout != null)
-// {
-// fout.close();
-// }
- }
- catch (IOException e)
- {
- logger.error("fail to close file. path:" + path, e);
- }
- }
-
- }
-
- @Override
- public boolean exists(String key)
- {
- String path = getPath(key);
- File file = new File(path);
- _readWriteLock.readLock().lock();
-
- boolean ret = file.exists();
- _readWriteLock.readLock().unlock();
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/file/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/file/package-info.java b/helix-core/src/main/java/com/linkedin/helix/store/file/package-info.java
deleted file mode 100644
index 1dab611..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/file/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix implementation of file-based property store (Deprecated)
- *
- */
-package com.linkedin.helix.store.file;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/package-info.java b/helix-core/src/main/java/com/linkedin/helix/store/package-info.java
deleted file mode 100644
index 4417993..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix application property store classes
- *
- */
-package com.linkedin.helix.store;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/PropertyItem.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/PropertyItem.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/PropertyItem.java
deleted file mode 100644
index f89bfc6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/PropertyItem.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store.zk;
-
-import org.apache.zookeeper.data.Stat;
-
-public class PropertyItem
-{
- byte[] _value;
- Stat _stat;
-
- public PropertyItem(byte[] value, Stat stat)
- {
- _value = value;
- _stat = stat;
- }
-
- public byte[] getBytes()
- {
- return _value;
- }
-
- public int getVersion()
- {
- return _stat.getVersion();
- }
-
- public long getLastModifiedTime()
- {
- return _stat.getMtime();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZKPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZKPropertyStore.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZKPropertyStore.java
deleted file mode 100644
index 4cfc54f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZKPropertyStore.java
+++ /dev/null
@@ -1,735 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store.zk;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.store.PropertyChangeListener;
-import com.linkedin.helix.store.PropertySerializer;
-import com.linkedin.helix.store.PropertyStat;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.PropertyStoreException;
-
-@Deprecated
-public class ZKPropertyStore<T> implements
- PropertyStore<T>,
- IZkStateListener,
- IZkDataListener // , IZkChildListener,
-{
- private static Logger LOG = Logger.getLogger(ZKPropertyStore.class);
-
- class ByteArrayUpdater implements DataUpdater<byte[]>
- {
- final DataUpdater<T> _updater;
- final PropertySerializer<T> _serializer;
-
- ByteArrayUpdater(DataUpdater<T> updater, PropertySerializer<T> serializer)
- {
- _updater = updater;
- _serializer = serializer;
- }
-
- @Override
- public byte[] update(byte[] current)
- {
- try
- {
- T currentValue = null;
- if (current != null)
- {
- currentValue = _serializer.deserialize(current);
- }
- T updateValue = _updater.update(currentValue);
- return _serializer.serialize(updateValue);
- }
- catch (PropertyStoreException e)
- {
- LOG.error("Exception in update. Updater: " + _updater, e);
- }
- return null;
- }
- }
-
- private volatile boolean _isConnected = false;
- private volatile boolean _hasSessionExpired = false;
-
- protected final ZkClient _zkClient;
- protected PropertySerializer<T> _serializer;
- protected final String _root;
-
- // zookeeperPath->userCallbak->zkCallback
- private final Map<String, Map<PropertyChangeListener<T>, ZkCallbackHandler<T>>> _callbackMap =
- new HashMap<String, Map<PropertyChangeListener<T>, ZkCallbackHandler<T>>>();
-
- // TODO cache capacity should be bounded
- private final Map<String, PropertyItem> _cache =
- new ConcurrentHashMap<String, PropertyItem>();
-
- /**
- * The given zkClient is assumed to serialize and deserialize raw byte[]
- * for the given root and its descendants.
- */
- public ZKPropertyStore(ZkClient zkClient, final PropertySerializer<T> serializer,
- String root)
- {
- if (zkClient == null || serializer == null || root == null)
- {
- throw new IllegalArgumentException("zkClient|serializer|root can't be null");
- }
-
- _root = normalizeKey(root);
- _zkClient = zkClient;
-
- setPropertySerializer(serializer);
-
- _zkClient.createPersistent(_root, true);
- _zkClient.subscribeStateChanges(this);
- }
-
- // key is normalized if it has exactly 1 leading slash
- private String normalizeKey(String key)
- {
- if (key == null)
- {
- LOG.error("Key can't be null");
- throw new IllegalArgumentException("Key can't be null");
- }
-
- // strip off leading slash
- while (key.startsWith("/"))
- {
- key = key.substring(1);
- }
-
- return "/" + key;
- }
-
- private String getAbsolutePath(String key)
- {
- key = normalizeKey(key);
- if (key.equals("/"))
- {
- return _root;
- }
- else
- {
- return _root + key;
- }
- }
-
- // always a return normalized key
- String getRelativePath(String path)
- {
- if (!path.startsWith(_root))
- {
- String errMsg = path + "does NOT start with property store's root: " + _root;
- LOG.error(errMsg);
- throw new IllegalArgumentException(errMsg);
- }
-
- if (path.equals(_root))
- {
- return "/";
- }
- else
- {
- return path.substring(_root.length());
- }
- }
-
- @Override
- public void createPropertyNamespace(String prefix) throws PropertyStoreException
- {
- String path = getAbsolutePath(prefix);
- try
- {
- if (!_zkClient.exists(path))
- {
- _zkClient.createPersistent(path, true);
- }
- }
- catch (Exception e)
- {
- LOG.error("Exception in creatPropertyNamespace(" + prefix + ")", e);
- throw new PropertyStoreException(e.toString());
- }
- }
-
- @Override
- public void setProperty(String key, final T value) throws PropertyStoreException
- {
- String path = getAbsolutePath(key);
-
- try
- {
- if (!_zkClient.exists(path))
- {
- _zkClient.createPersistent(path, true);
- }
-
- // serializer should handle value == null
- byte[] valueBytes = _serializer.serialize(value);
- _zkClient.writeData(path, valueBytes);
-
- // update cache
- // getProperty(key);
-
- }
- catch (Exception e)
- {
- LOG.error("Exception when setProperty(" + key + ", " + value + ")", e);
- throw new PropertyStoreException(e.toString());
- }
- }
-
- @Override
- public T getProperty(String key) throws PropertyStoreException
- {
- return getProperty(key, null);
- }
-
- // bytes and stat are not null
- private T getValueAndStat(byte[] bytes, Stat stat, PropertyStat propertyStat) throws PropertyStoreException
- {
- T value = _serializer.deserialize(bytes);
-
- if (propertyStat != null)
- {
- propertyStat.setLastModifiedTime(stat.getMtime());
- propertyStat.setVersion(stat.getVersion());
- }
- return value;
- }
-
- @Override
- public T getProperty(String key, PropertyStat propertyStat) throws PropertyStoreException
- {
- String normalizedKey = normalizeKey(key);
- String path = getAbsolutePath(normalizedKey);
- Stat stat = new Stat();
-
- T value = null;
- try
- {
- synchronized (_cache)
- {
- PropertyItem item = _cache.get(normalizedKey);
- _zkClient.subscribeDataChanges(path, this);
- if (item != null)
- {
- // cache hit
- stat = _zkClient.getStat(path);
- if (stat != null)
- {
- if (item._stat.getCzxid() != stat.getCzxid()
- || item.getVersion() < stat.getVersion())
- {
- // stale data in cache
- byte[] bytes = _zkClient.readDataAndStat(path, stat, true);
- if (bytes != null)
- {
- value = getValueAndStat(bytes, stat, propertyStat);
- _cache.put(normalizedKey, new PropertyItem(bytes, stat));
- }
- }
- else
- {
- // valid data in cache
- // item.getBytes() should not be null
- value = getValueAndStat(item.getBytes(), stat, propertyStat);
- }
- }
- }
- else
- {
- // cache miss
- byte[] bytes = _zkClient.readDataAndStat(path, stat, true);
- if (bytes != null)
- {
- value = getValueAndStat(bytes, stat, propertyStat);
- _cache.put(normalizedKey, new PropertyItem(bytes, stat));
- }
- }
- }
- return value;
- }
- catch (Exception e)
- {
- LOG.error("Exception in getProperty(" + key + ")", e);
- throw (new PropertyStoreException(e.toString()));
- }
- }
-
- @Override
- public void removeProperty(String key) throws PropertyStoreException
- {
- String normalizedKey = normalizeKey(key);
- String path = getAbsolutePath(normalizedKey);
-
- try
- {
- // if (_zkClient.exists(path))
- // {
- _zkClient.delete(path);
- // }
- // _cache.remove(normalizedKey);
-
- }
- catch (ZkNoNodeException e)
- {
- // OK
- }
- catch (Exception e)
- {
- LOG.error("Exception in removeProperty(" + key + ")", e);
- throw (new PropertyStoreException(e.toString()));
- }
- }
-
- @Override
- public String getPropertyRootNamespace()
- {
- return _root;
- }
-
- @Override
- public void removeNamespace(String prefix) throws PropertyStoreException
- {
- String path = getAbsolutePath(prefix);
-
- try
- {
- // if (_zkClient.exists(path))
- // {
- _zkClient.deleteRecursive(path);
- // }
-
- // update cache
- // childs are all normalized keys
- // List<String> childs = getPropertyNames(prefix);
- // for (String child : childs)
- // {
- // _cache.remove(child);
- // }
- }
- catch (ZkNoNodeException e)
- {
- // OK
- }
- catch (Exception e)
- {
- LOG.error("Exception in removeProperty(" + prefix + ")", e);
- throw (new PropertyStoreException(e.toString()));
- }
- }
-
- // prefix is always normalized
- private void doGetPropertyNames(String prefix, List<String> leafNodes) throws PropertyStoreException
- {
- String path = getAbsolutePath(prefix);
-
- if (!_zkClient.exists(path))
- {
- return;
- }
-
- List<String> childs = _zkClient.getChildren(path);
- if (childs == null)
- {
- return;
- }
-
- if (childs.size() == 0)
- {
- // add leaf node to cache
- // getProperty(prefix);
- leafNodes.add(prefix);
- return;
- }
-
- for (String child : childs)
- {
- String childPath = prefix.equals("/") ? prefix + child : prefix + "/" + child;
- doGetPropertyNames(childPath, leafNodes);
- }
- }
-
- @Override
- public List<String> getPropertyNames(String prefix) throws PropertyStoreException
- {
- String normalizedKey = normalizeKey(prefix);
- List<String> propertyNames = new ArrayList<String>();
- doGetPropertyNames(normalizedKey, propertyNames);
-
- // sort it to get deterministic order
- if (propertyNames.size() > 1)
- {
- Collections.sort(propertyNames);
- }
-
- return propertyNames;
- }
-
- @Override
- public void setPropertyDelimiter(String delimiter) throws PropertyStoreException
- {
- throw new PropertyStoreException("setPropertyDelimiter() not implemented for ZKPropertyStore");
- }
-
- // put data/child listeners on prefix and all childs
- @Override
- public void subscribeForPropertyChange(String prefix,
- final PropertyChangeListener<T> listener) throws PropertyStoreException
- {
- if (listener == null)
- {
- throw new IllegalArgumentException("listener can't be null. Prefix: " + prefix);
- }
-
- String path = getAbsolutePath(prefix);
-
- ZkCallbackHandler<T> callback = null;
- synchronized (_callbackMap)
- {
- Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks;
- if (!_callbackMap.containsKey(path))
- {
- _callbackMap.put(path,
- new HashMap<PropertyChangeListener<T>, ZkCallbackHandler<T>>());
- }
- callbacks = _callbackMap.get(path);
-
- if (!callbacks.containsKey(listener))
- {
- callback = new ZkCallbackHandler<T>(_zkClient, this, prefix, listener);
- callbacks.put(listener, callback);
- }
- }
-
- try
- {
- if (callback != null)
- {
- // a newly added callback
- _zkClient.subscribeDataChanges(path, callback);
- _zkClient.subscribeChildChanges(path, callback);
-
- // do initial invocation
- callback.handleChildChange(path, _zkClient.getChildren(path));
-
- LOG.debug("Subscribed changes for " + path);
- }
- }
- catch (Exception e)
- {
- LOG.error("Exception in subscribeForPropertyChange(" + prefix + ")", e);
- throw (new PropertyStoreException(e.toString()));
- }
- }
-
- // prefix is always a normalized key
- private void doUnsubscribeForPropertyChange(String prefix, ZkCallbackHandler<T> callback)
- {
- String path = getAbsolutePath(prefix);
-
- _zkClient.unsubscribeDataChanges(path, callback);
- _zkClient.unsubscribeChildChanges(path, callback);
-
- List<String> childs = _zkClient.getChildren(path);
- if (childs == null || childs.size() == 0)
- {
- return;
- }
-
- for (String child : childs)
- {
- doUnsubscribeForPropertyChange(prefix + "/" + child, callback);
- }
- }
-
- @Override
- public void unsubscribeForPropertyChange(String prefix,
- PropertyChangeListener<T> listener) throws PropertyStoreException
- {
- if (listener == null)
- {
- throw new IllegalArgumentException("listener can't be null. Prefix: " + prefix);
- }
-
- String path = getAbsolutePath(prefix);
- ZkCallbackHandler<T> callback = null;
-
- synchronized (_callbackMap)
- {
- if (_callbackMap.containsKey(path))
- {
- Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks =
- _callbackMap.get(path);
- callback = callbacks.remove(listener);
-
- if (callbacks == null || callbacks.isEmpty())
- {
- _callbackMap.remove(path);
- }
- }
- }
-
- if (callback != null)
- {
- doUnsubscribeForPropertyChange(prefix, callback);
- LOG.debug("Unsubscribed changes for " + path);
- }
- }
-
- @Override
- public boolean canParentStoreData()
- {
- return false;
- }
-
- @Override
- public void setPropertySerializer(final PropertySerializer<T> serializer)
- {
- if (serializer == null)
- {
- throw new IllegalArgumentException("serializer can't be null");
- }
-
- _serializer = serializer;
- }
-
- @Override
- public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater) throws PropertyStoreException
- {
- updatePropertyUntilSucceed(key, updater, true);
- }
-
- @Override
- public void updatePropertyUntilSucceed(String key,
- DataUpdater<T> updater,
- boolean createIfAbsent) throws PropertyStoreException
- {
- String path = getAbsolutePath(key);
- try
- {
- if (!_zkClient.exists(path))
- {
- if (!createIfAbsent)
- {
- throw new PropertyStoreException("Can't update " + key
- + " since no node exists");
- }
- else
- {
- _zkClient.createPersistent(path, true);
- }
- }
-
- _zkClient.updateDataSerialized(path, new ByteArrayUpdater(updater, _serializer));
- }
- catch (Exception e)
- {
- LOG.error("Exception in updatePropertyUntilSucceed(" + key + ", " + createIfAbsent
- + ")", e);
- throw (new PropertyStoreException(e.toString()));
- }
-
- // update cache
- // getProperty(key);
- }
-
- @Override
- public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator)
- {
- return compareAndSet(key, expected, update, comparator, true);
- }
-
- @Override
- public boolean compareAndSet(String key,
- T expected,
- T update,
- Comparator<T> comparator,
- boolean createIfAbsent)
- {
- String path = getAbsolutePath(key);
-
- // if two threads call with createIfAbsent=true
- // one thread creates the node, the other just goes through
- // when wirteData() one thread writes the other gets ZkBadVersionException
- if (!_zkClient.exists(path))
- {
- if (createIfAbsent)
- {
- _zkClient.createPersistent(path, true);
- }
- else
- {
- return false;
- }
- }
-
- try
- {
- Stat stat = new Stat();
- byte[] currentBytes = _zkClient.readDataAndStat(path, stat, true);
- T current = null;
- if (currentBytes != null)
- {
- current = _serializer.deserialize(currentBytes);
- }
-
- if (comparator.compare(current, expected) == 0)
- {
- byte[] valueBytes = _serializer.serialize(update);
- _zkClient.writeData(path, valueBytes, stat.getVersion());
-
- // update cache
- // getProperty(key);
-
- return true;
- }
- }
- catch (ZkBadVersionException e)
- {
- LOG.warn("Get BadVersion when writing to zookeeper. Mostly Ignorable due to contention");
- }
- catch (Exception e)
- {
- LOG.error("Exception when compareAndSet(" + key + ")", e);
- }
-
- return false;
- }
-
- @Override
- public boolean exists(String key)
- {
- String path = getAbsolutePath(key);
- return _zkClient.exists(path);
- }
-
- @Override
- public void handleStateChanged(KeeperState state) throws Exception
- {
- LOG.info("KeeperState:" + state);
- switch (state)
- {
- case SyncConnected:
- _isConnected = true;
- break;
- case Disconnected:
- _isConnected = false;
- break;
- case Expired:
- _isConnected = false;
- _hasSessionExpired = true;
- break;
- }
- }
-
- @Override
- public void handleNewSession() throws Exception
- {
- ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
- ZooKeeper zookeeper = connection.getZookeeper();
- LOG.info("handleNewSession: " + zookeeper.getSessionId());
-
- synchronized (_callbackMap)
- {
- for (String path : _callbackMap.keySet())
- {
- Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks =
- _callbackMap.get(path);
- if (callbacks == null || callbacks.size() == 0)
- {
- LOG.error("Get a null callback map. Remove it. Path: " + path);
- _callbackMap.remove(path);
- continue;
- }
-
- for (PropertyChangeListener<T> listener : callbacks.keySet())
- {
- ZkCallbackHandler<T> callback = callbacks.get(listener);
-
- if (callback == null)
- {
- LOG.error("Get a null callback. Remove it. Path: " + path + ", listener: "
- + listener);
- callbacks.remove(listener);
- continue;
- }
- _zkClient.subscribeDataChanges(path, callback);
- _zkClient.subscribeChildChanges(path, callback);
-
- // do initial invocation
- callback.handleChildChange(path, _zkClient.getChildren(path));
- }
- }
- }
- }
-
- @Override
- public boolean start()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean stop()
- {
- _zkClient.close();
- return true;
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception
- {
- // TODO Auto-generated method stub
- String key = getRelativePath(dataPath);
- synchronized (_cache)
- {
- _zkClient.unsubscribeDataChanges(dataPath, this);
- _cache.remove(key);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZNode.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZNode.java
deleted file mode 100644
index 555e517..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZNode.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package com.linkedin.helix.store.zk;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.zookeeper.data.Stat;
-
-public class ZNode
-{
- // used for a newly created item, because zkclient.create() doesn't return stat
- // or used for places where we don't care about stat
- public static final Stat ZERO_STAT = new Stat();
-
- final String _zkPath;
- private Stat _stat;
- Object _data;
- Set<String> _childSet;
-
- public ZNode(String zkPath, Object data, Stat stat)
- {
- _zkPath = zkPath;
- _childSet = Collections.<String>emptySet(); // new HashSet<String>();
- _data = data;
- _stat = stat;
- }
-
- public void removeChild(String child)
- {
- if (_childSet != Collections.<String>emptySet())
- {
- _childSet.remove(child);
- }
- }
-
- public void addChild(String child)
- {
- if (_childSet == Collections.<String>emptySet())
- {
- _childSet = new HashSet<String>();
- }
-
- _childSet.add(child);
- }
-
- public void addChildren(List<String> children)
- {
- if (children != null && !children.isEmpty())
- {
- if (_childSet == Collections.<String>emptySet())
- {
- _childSet = new HashSet<String>();
- }
-
- _childSet.addAll(children);
- }
- }
-
- public boolean hasChild(String child)
- {
- return _childSet.contains(child);
- }
-
- public Set<String> getChildSet()
- {
- return _childSet;
- }
-
- public void setData(Object data)
- {
-// System.out.println("setData: " + _zkPath + ", data: " + data);
- _data= data;
- }
-
- public Object getData()
- {
- return _data;
- }
-
- public void setStat(Stat stat)
- {
- _stat = stat;
- }
-
- public Stat getStat()
- {
- return _stat;
- }
-
- public void setChildSet(List<String> childNames)
- {
- if (childNames != null && !childNames.isEmpty())
- {
- if (_childSet == Collections.<String>emptySet())
- {
- _childSet = new HashSet<String>();
- }
-
- _childSet.clear();
- _childSet.addAll(childNames);
- }
- }
-
- @Override
- public String toString()
- {
- return _zkPath + ", " + _data + ", " + _childSet + ", " + _stat;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkCallbackHandler.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkCallbackHandler.java
deleted file mode 100644
index 48bb0d2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkCallbackHandler.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store.zk;
-
-import java.util.List;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.store.PropertyChangeListener;
-
-class ZkCallbackHandler<T> implements IZkChildListener, IZkDataListener
-{
- private static Logger LOG = Logger.getLogger(ZkCallbackHandler.class);
-
- private final ZkClient _zkClient;
- private final ZKPropertyStore<T> _store;
-
- // listen on prefix and all its childs
- private final String _prefix;
- private final PropertyChangeListener<T> _listener;
-
- public ZkCallbackHandler(ZkClient client, ZKPropertyStore<T> store, String prefix,
- PropertyChangeListener<T> listener)
- {
- _zkClient = client;
- _store = store;
- _prefix = prefix;
- _listener = listener;
- }
-
- @Override
- public void handleDataChange(String path, Object data) throws Exception
- {
- LOG.debug("Data changed @ " + path + " to " + data);
- String key = _store.getRelativePath(path);
- _listener.onPropertyChange(key);
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception
- {
- LOG.debug("Data deleted @ " + dataPath);
- }
-
- @Override
- public void handleChildChange(String path, List<String> currentChilds) throws Exception
- {
- LOG.debug("childs changed @ " + path + " to " + currentChilds);
- // System.out.println("childs changed @ " + path + " to " + currentChilds);
-
-
- if (currentChilds == null)
- {
- /**
- * When a node with a child change watcher is deleted
- * a child change is triggered on the deleted node
- * and in this case, the currentChilds is null
- */
- return;
-// } else if (currentChilds.size() == 0)
-// {
-// String key = _store.getRelativePath(path);
-// _listener.onPropertyChange(key);
- }
- else
- {
- String key = _store.getRelativePath(path);
- _listener.onPropertyChange(key);
-
- for (String child : currentChilds)
- {
- String childPath = path.endsWith("/") ? path + child : path + "/" + child;
- _zkClient.subscribeDataChanges(childPath, this);
- _zkClient.subscribeChildChanges(childPath, this);
-
- // recursive call
- handleChildChange(childPath, _zkClient.getChildren(childPath));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkHelixPropertyStore.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkHelixPropertyStore.java
deleted file mode 100644
index 785efea..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkHelixPropertyStore.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package com.linkedin.helix.store.zk;
-
-import java.util.List;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkCacheBaseDataAccessor;
-
-public class ZkHelixPropertyStore<T> extends ZkCacheBaseDataAccessor<T>
-{
- public ZkHelixPropertyStore(ZkBaseDataAccessor<T> accessor,
- String root,
- List<String> subscribedPaths)
- {
- super(accessor, root, null, subscribedPaths);
- }
-
- public ZkHelixPropertyStore(String zkAddress,
- ZkSerializer serializer,
- String chrootPath,
- List<String> zkCachePaths)
- {
- super(zkAddress, serializer, chrootPath, null, zkCachePaths);
- }
-
- public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath)
- {
- super(zkAddress, serializer, chrootPath, null, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkListener.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkListener.java
deleted file mode 100644
index c9b264a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkListener.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.linkedin.helix.store.zk;
-
-public interface ZkListener
-{
- void handleDataChange(String path);
-
- void handleNodeCreate(String path);
-
- void handleNodeDelete(String path);
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/package-info.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/package-info.java
deleted file mode 100644
index cd9735c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix implementation of zookeeper-based property store (Deprecated)
- *
- */
-package com.linkedin.helix.store.zk;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/CLMLogFileAppender.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/CLMLogFileAppender.java b/helix-core/src/main/java/com/linkedin/helix/tools/CLMLogFileAppender.java
deleted file mode 100644
index 6783046..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/CLMLogFileAppender.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.log4j.FileAppender;
-import org.apache.log4j.Layout;
-import org.apache.log4j.spi.ErrorCode;
-
-public class CLMLogFileAppender extends FileAppender
-{
- public CLMLogFileAppender()
- {
- }
-
- public CLMLogFileAppender(Layout layout, String filename, boolean append,
- boolean bufferedIO, int bufferSize) throws IOException
- {
- super(layout, filename, append, bufferedIO, bufferSize);
- }
-
- public CLMLogFileAppender(Layout layout, String filename, boolean append)
- throws IOException
- {
- super(layout, filename, append);
- }
-
- public CLMLogFileAppender(Layout layout, String filename) throws IOException
- {
- super(layout, filename);
- }
-
- public void activateOptions()
- {
- if (fileName != null)
- {
- try
- {
- fileName = getNewLogFileName();
- setFile(fileName, fileAppend, bufferedIO, bufferSize);
- } catch (Exception e)
- {
- errorHandler.error("Error while activating log options", e,
- ErrorCode.FILE_OPEN_FAILURE);
- }
- }
- }
-
- private String getNewLogFileName()
- {
- Calendar cal = Calendar.getInstance();
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
- String time = sdf.format(cal.getTime());
-
- StackTraceElement[] stack = Thread.currentThread().getStackTrace();
- StackTraceElement main = stack[stack.length - 1];
- String mainClass = main.getClassName();
-
- return System.getProperty("user.home") + "/EspressoLogs/" + mainClass + "_"
- + time + ".txt";
- }
-}