You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2012/06/29 19:42:56 UTC
svn commit: r1355481 [2/4] - in /accumulo/trunk: ./ bin/ core/
core/src/main/java/org/apache/accumulo/core/client/
core/src/main/java/org/apache/accumulo/core/client/admin/
core/src/main/java/org/apache/accumulo/core/client/impl/
core/src/main/java/org...
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.List;
+
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+public interface IZooReaderWriter extends IZooReader {
+
+ public abstract ZooKeeper getZooKeeper();
+
+ public abstract void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException;
+
+ public abstract void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException;
+
+ /**
+ * Create a persistent node with the default ACL
+ *
+ * @return true if the node was created or altered; false if it was skipped
+ */
+ public abstract boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
+
+ public abstract boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
+
+ public abstract void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
+
+ public abstract String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException;
+
+ public abstract String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException;
+
+ public abstract void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
+
+ public abstract void delete(String path, int version) throws InterruptedException, KeeperException;
+
+ public abstract byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) throws Exception;
+
+ public abstract boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException;
+
+ public abstract void mkdirs(String path) throws KeeperException, InterruptedException;
+
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+
+public class TransactionWatcher {
+
+ private static final Logger log = Logger.getLogger(TransactionWatcher.class);
+ final private Map<Long,AtomicInteger> counts = new HashMap<Long,AtomicInteger>();
+ final private Arbitrator arbitrator;
+
+ public interface Arbitrator {
+ boolean transactionAlive(String type, long tid) throws Exception;
+ }
+
+ public TransactionWatcher(Arbitrator arbitrator) {
+ this.arbitrator = arbitrator;
+ }
+
+ public <T> T run(String ztxBulk, long tid, Callable<T> callable) throws Exception {
+ synchronized (counts) {
+ if (!arbitrator.transactionAlive(ztxBulk, tid)) {
+ throw new Exception("Transaction " + tid + " of type " + ztxBulk + " is no longer active");
+ }
+ AtomicInteger count = counts.get(tid);
+ if (count == null)
+ counts.put(tid, count = new AtomicInteger());
+ count.incrementAndGet();
+ }
+ try {
+ return callable.call();
+ } finally {
+ synchronized (counts) {
+ AtomicInteger count = counts.get(tid);
+ if (count == null) {
+ log.error("unexpected missing count for transaction" + tid);
+ } else {
+ if (count.decrementAndGet() == 0)
+ counts.remove(tid);
+ }
+ }
+ }
+ }
+
+ public boolean isActive(long tid) {
+ synchronized (counts) {
+ log.debug("Transactions in progress " + counts);
+ AtomicInteger count = counts.get(tid);
+ return count != null && count.get() > 0;
+ }
+ }
+
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
+ *
+ */
+public class ZooCache {
+ private static final Logger log = Logger.getLogger(ZooCache.class);
+
+ private ZCacheWatcher watcher = new ZCacheWatcher();
+ private Watcher externalWatcher = null;
+
+ private HashMap<String,byte[]> cache;
+ private HashMap<String,Stat> statCache;
+ private HashMap<String,List<String>> childrenCache;
+
+ private ZooReader zReader;
+
+ private ZooKeeper getZooKeeper() {
+ return zReader.getZooKeeper();
+ }
+
+ private class ZCacheWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent event) {
+
+ if (log.isTraceEnabled())
+ log.trace(event);
+
+ switch (event.getType()) {
+ case NodeDataChanged:
+ case NodeChildrenChanged:
+ case NodeCreated:
+ case NodeDeleted:
+ remove(event.getPath());
+ break;
+ case None:
+ switch (event.getState()) {
+ case Disconnected:
+ if (log.isTraceEnabled())
+ log.trace("Zoo keeper connection disconnected, clearing cache");
+ clear();
+ break;
+ case SyncConnected:
+ break;
+ case Expired:
+ if (log.isTraceEnabled())
+ log.trace("Zoo keeper connection expired, clearing cache");
+ clear();
+ break;
+ default:
+ log.warn("Unhandled: " + event);
+ }
+ break;
+ default:
+ log.warn("Unhandled: " + event);
+ }
+
+ if (externalWatcher != null) {
+ externalWatcher.process(event);
+ }
+ }
+ }
+
+ public ZooCache(String zooKeepers, int sessionTimeout) {
+ this(zooKeepers, sessionTimeout, null);
+ }
+
+ public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
+ this(new ZooReader(zooKeepers, sessionTimeout), watcher);
+ }
+
+ public ZooCache(ZooReader reader, Watcher watcher) {
+ this.zReader = reader;
+ this.cache = new HashMap<String,byte[]>();
+ this.statCache = new HashMap<String,Stat>();
+ this.childrenCache = new HashMap<String,List<String>>();
+ this.externalWatcher = watcher;
+ }
+
+ private static interface ZooRunnable {
+ void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException;
+ }
+
+ private synchronized void retry(ZooRunnable op) {
+
+ int sleepTime = 100;
+
+ while (true) {
+
+ ZooKeeper zooKeeper = getZooKeeper();
+
+ try {
+ op.run(zooKeeper);
+ return;
+
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ log.error("Looked up non existant node in cache " + e.getPath(), e);
+ }
+ log.warn("Zookeeper error, will retry", e);
+ } catch (InterruptedException e) {
+ log.info("Zookeeper error, will retry", e);
+ } catch (ConcurrentModificationException e) {
+ log.debug("Zookeeper was modified, will retry");
+ }
+
+ try {
+ // do not hold lock while sleeping
+ wait(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (sleepTime < 10000)
+ sleepTime = (int) (sleepTime + sleepTime * Math.random());
+
+ }
+ }
+
+ public synchronized List<String> getChildren(final String zPath) {
+
+ ZooRunnable zr = new ZooRunnable() {
+
+ @Override
+ public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
+
+ if (childrenCache.containsKey(zPath))
+ return;
+
+ try {
+ List<String> children = zooKeeper.getChildren(zPath, watcher);
+ childrenCache.put(zPath, children);
+ } catch (KeeperException ke) {
+ if (ke.code() != Code.NONODE) {
+ throw ke;
+ }
+ }
+ }
+
+ };
+
+ retry(zr);
+
+ List<String> children = childrenCache.get(zPath);
+ if (children == null) {
+ return null;
+ }
+ return Collections.unmodifiableList(children);
+ }
+
+ public synchronized byte[] get(final String zPath) {
+ return get(zPath, null);
+ }
+
+ public synchronized byte[] get(final String zPath, Stat stat) {
+ ZooRunnable zr = new ZooRunnable() {
+
+ @Override
+ public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
+
+ if (cache.containsKey(zPath))
+ return;
+
+ /*
+ * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existance, it will be added to
+ * the cache. But this notification of a node coming into existance will only be given if exists() was previously called.
+ *
+ * If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then
+ * non-existance can not be cached.
+ */
+
+ Stat stat = zooKeeper.exists(zPath, watcher);
+
+ byte[] data = null;
+
+ if (stat == null) {
+ if (log.isTraceEnabled())
+ log.trace("zookeeper did not contain " + zPath);
+ } else {
+ try {
+ data = zooKeeper.getData(zPath, watcher, stat);
+ } catch (KeeperException.BadVersionException e1) {
+ throw new ConcurrentModificationException();
+ } catch (KeeperException.NoNodeException e2) {
+ throw new ConcurrentModificationException();
+ }
+ if (log.isTraceEnabled())
+ log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data)));
+ }
+ if (log.isTraceEnabled())
+ log.trace("putting " + zPath + " " + (data == null ? null : new String(data)) + " in cache");
+ put(zPath, data, stat);
+ }
+
+ };
+
+ retry(zr);
+
+ if (stat != null) {
+ Stat cstat = statCache.get(zPath);
+ if (cstat != null) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ cstat.write(dos);
+ dos.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream dis = new DataInputStream(bais);
+ stat.readFields(dis);
+
+ dis.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ return cache.get(zPath);
+ }
+
+ private synchronized void put(String zPath, byte[] data, Stat stat) {
+ cache.put(zPath, data);
+ statCache.put(zPath, stat);
+ }
+
+ private synchronized void remove(String zPath) {
+ if (log.isTraceEnabled())
+ log.trace("removing " + zPath + " from cache");
+ cache.remove(zPath);
+ childrenCache.remove(zPath);
+ statCache.remove(zPath);
+ }
+
+ public synchronized void clear() {
+ cache.clear();
+ childrenCache.clear();
+ statCache.clear();
+ }
+
+ public synchronized void clear(String zPath) {
+
+ for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
+ String path = i.next();
+ if (path.startsWith(zPath))
+ i.remove();
+ }
+
+ for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) {
+ String path = i.next();
+ if (path.startsWith(zPath))
+ i.remove();
+ }
+
+ for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) {
+ String path = i.next();
+ if (path.startsWith(zPath))
+ i.remove();
+ }
+ }
+
+ private static Map<String,ZooCache> instances = new HashMap<String,ZooCache>();
+
+ public static synchronized ZooCache getInstance(String zooKeepers, int sessionTimeout) {
+ String key = zooKeepers + ":" + sessionTimeout;
+ ZooCache zc = instances.get(key);
+ if (zc == null) {
+ zc = new ZooCache(zooKeepers, sessionTimeout);
+ instances.put(key, zc);
+ }
+
+ return zc;
+ }
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooLock implements Watcher {
+
+ protected static final Logger log = Logger.getLogger(ZooLock.class);
+
+ public static final String LOCK_PREFIX = "zlock-";
+
+ public enum LockLossReason {
+ LOCK_DELETED, SESSION_EXPIRED
+ }
+
+ public interface LockWatcher {
+ void lostLock(LockLossReason reason);
+ }
+
+ public interface AsyncLockWatcher extends LockWatcher {
+ void acquiredLock();
+
+ void failedToAcquireLock(Exception e);
+ }
+
+ private boolean lockWasAcquired;
+ final private String path;
+ protected final IZooReaderWriter zooKeeper;
+ private String lock;
+ private LockWatcher lockWatcher;
+
+ private String asyncLock;
+
+ public ZooLock(String zookeepers, int timeInMillis, String auth, String path) {
+ this(new ZooCache(zookeepers, timeInMillis), ZooReaderWriter.getInstance(zookeepers, timeInMillis, auth), path);
+ }
+
+ protected ZooLock(ZooCache zc, IZooReaderWriter zrw, String path) {
+ getLockDataZooCache = zc;
+ this.path = path;
+ zooKeeper = zrw;
+ try {
+ zooKeeper.getStatus(path, this);
+ } catch (Exception ex) {
+ log.warn("Error getting setting initial watch on ZooLock", ex);
+ }
+ }
+
+ private static class TryLockAsyncLockWatcher implements AsyncLockWatcher {
+
+ boolean acquiredLock = false;
+ LockWatcher lw;
+
+ public TryLockAsyncLockWatcher(LockWatcher lw2) {
+ this.lw = lw2;
+ }
+
+ @Override
+ public void acquiredLock() {
+ acquiredLock = true;
+ }
+
+ @Override
+ public void failedToAcquireLock(Exception e) {}
+
+ @Override
+ public void lostLock(LockLossReason reason) {
+ lw.lostLock(reason);
+ }
+
+ }
+
+ public synchronized boolean tryLock(LockWatcher lw, byte data[]) throws KeeperException, InterruptedException {
+
+ TryLockAsyncLockWatcher tlalw = new TryLockAsyncLockWatcher(lw);
+
+ lockAsync(tlalw, data);
+
+ if (tlalw.acquiredLock) {
+ return true;
+ }
+
+ if (asyncLock != null) {
+ zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
+ asyncLock = null;
+ }
+
+ return false;
+ }
+
+ private synchronized void lockAsync(final String myLock, final AsyncLockWatcher lw) throws KeeperException, InterruptedException {
+
+ if (asyncLock == null) {
+ throw new IllegalStateException("Called lockAsync() when asyncLock == null");
+ }
+
+ List<String> children = zooKeeper.getChildren(path);
+
+ if (!children.contains(myLock)) {
+ throw new RuntimeException("Lock attempt ephemeral node no longer exist " + myLock);
+ }
+
+ Collections.sort(children);
+
+ if (children.get(0).equals(myLock)) {
+ this.lockWatcher = lw;
+ this.lock = myLock;
+ asyncLock = null;
+ lockWasAcquired = true;
+ lw.acquiredLock();
+ return;
+ }
+ String prev = null;
+ for (String child : children) {
+ if (child.equals(myLock)) {
+ break;
+ }
+
+ prev = child;
+ }
+
+ final String lockToWatch = path + "/" + prev;
+
+ Stat stat = zooKeeper.getStatus(path + "/" + prev, new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+
+ if (event.getType() == EventType.NodeDeleted && event.getPath().equals(lockToWatch)) {
+ synchronized (ZooLock.this) {
+ try {
+ if (asyncLock != null) {
+ lockAsync(myLock, lw);
+ } else if (log.isTraceEnabled()) {
+ log.trace("While waiting for another lock " + lockToWatch + " " + myLock + " was deleted");
+ }
+ } catch (Exception e) {
+ if (lock == null) {
+ // have not acquired lock yet
+ lw.failedToAcquireLock(e);
+ }
+ }
+ }
+ }
+
+ if (event.getState() == KeeperState.Expired) {
+ synchronized (ZooLock.this) {
+ if (lock == null) {
+ lw.failedToAcquireLock(new Exception("Zookeeper Session expired"));
+ }
+ }
+ }
+ }
+
+ });
+
+ if (stat == null)
+ lockAsync(myLock, lw);
+ }
+
+ public synchronized void lockAsync(final AsyncLockWatcher lw, byte data[]) {
+
+ if (lockWatcher != null || lock != null || asyncLock != null) {
+ throw new IllegalStateException();
+ }
+
+ lockWasAcquired = false;
+
+ try {
+ String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
+
+ Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() {
+ public void process(WatchedEvent event) {
+ synchronized (ZooLock.this) {
+ if (lock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + lock)) {
+ LockWatcher localLw = lockWatcher;
+ lock = null;
+ lockWatcher = null;
+
+ localLw.lostLock(LockLossReason.LOCK_DELETED);
+
+ } else if (asyncLock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + asyncLock)) {
+ lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
+ asyncLock = null;
+ }
+ }
+ }
+ });
+
+ if (stat == null) {
+ lw.failedToAcquireLock(new Exception("Lock does not exist after create"));
+ return;
+ }
+
+ asyncLock = asyncLockPath.substring(path.length() + 1);
+
+ lockAsync(asyncLock, lw);
+
+ } catch (KeeperException e) {
+ lw.failedToAcquireLock(e);
+ } catch (InterruptedException e) {
+ lw.failedToAcquireLock(e);
+ }
+ }
+
+ public synchronized boolean tryToCancelAsyncLockOrUnlock() throws InterruptedException, KeeperException {
+ boolean del = false;
+
+ if (asyncLock != null) {
+ zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
+ del = true;
+ }
+
+ if (lock != null) {
+ unlock();
+ del = true;
+ }
+
+ return del;
+ }
+
+ public synchronized void unlock() throws InterruptedException, KeeperException {
+ if (lock == null) {
+ throw new IllegalStateException();
+ }
+
+ LockWatcher localLw = lockWatcher;
+ String localLock = lock;
+
+ lock = null;
+ lockWatcher = null;
+
+ zooKeeper.recursiveDelete(path + "/" + localLock, NodeMissingPolicy.SKIP);
+
+ localLw.lostLock(LockLossReason.LOCK_DELETED);
+ }
+
+ public synchronized String getLockPath() {
+ if (lock == null) {
+ return null;
+ }
+ return path + "/" + lock;
+ }
+
+ public synchronized String getLockName() {
+ return lock;
+ }
+
+ public synchronized LockID getLockID() {
+ if (lock == null) {
+ throw new IllegalStateException("Lock not held");
+ }
+ return new LockID(path, lock, zooKeeper.getZooKeeper().getSessionId());
+ }
+
+ /**
+ * indicates if the lock was acquired in the past.... helps discriminate between the case where the lock was never held, or held and lost....
+ *
+ * @return true if the lock was aquired, otherwise false.
+ */
+ public synchronized boolean wasLockAcquired() {
+ return lockWasAcquired;
+ }
+
+ public synchronized boolean isLocked() {
+ return lock != null;
+ }
+
+ @Override
+ public synchronized void process(WatchedEvent event) {
+ log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
+
+ if (event.getState() == KeeperState.Expired && lock != null) {
+ LockWatcher localLw = lockWatcher;
+ lock = null;
+ lockWatcher = null;
+ localLw.lostLock(LockLossReason.SESSION_EXPIRED);
+ }
+ }
+
+ public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
+
+ List<String> children = zk.getChildren(lid.path, false);
+
+ if (children == null || children.size() == 0) {
+ return false;
+ }
+
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+ if (!lid.node.equals(lockNode))
+ return false;
+
+ Stat stat = zk.exists(lid.path + "/" + lid.node, false);
+ return stat != null && stat.getEphemeralOwner() == lid.eid;
+ }
+
+ public static boolean isLockHeld(ZooCache zc, LockID lid) {
+
+ List<String> children = zc.getChildren(lid.path);
+
+ if (children == null || children.size() == 0) {
+ return false;
+ }
+
+ children = new ArrayList<String>(children);
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+ if (!lid.node.equals(lockNode))
+ return false;
+
+ Stat stat = new Stat();
+ return zc.get(lid.path + "/" + lid.node, stat) != null && stat.getEphemeralOwner() == lid.eid;
+ }
+
+ public static byte[] getLockData(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
+ List<String> children = zk.getChildren(path, false);
+
+ if (children == null || children.size() == 0) {
+ return null;
+ }
+
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+
+ return zk.getData(path + "/" + lockNode, false, null);
+ }
+
+ public static byte[] getLockData(org.apache.accumulo.fate.zookeeper.ZooCache zc, String path, Stat stat) {
+
+ List<String> children = zc.getChildren(path);
+
+ if (children == null || children.size() == 0) {
+ return null;
+ }
+
+ children = new ArrayList<String>(children);
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+
+ if (!lockNode.startsWith(LOCK_PREFIX)) {
+ throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
+ }
+
+ return zc.get(path + "/" + lockNode, stat);
+ }
+
+ private static ZooCache getLockDataZooCache;
+
+ public static byte[] getLockData(String path) {
+ return getLockData(path, null);
+ }
+
+ public static byte[] getLockData(String path, Stat stat) {
+ return getLockData(getLockDataZooCache, path, stat);
+ }
+
+ public static long getSessionId(ZooCache zc, String path) throws KeeperException, InterruptedException {
+ List<String> children = zc.getChildren(path);
+
+ if (children == null || children.size() == 0) {
+ return 0;
+ }
+
+ children = new ArrayList<String>(children);
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+
+ Stat stat = new Stat();
+ if (zc.get(path + "/" + lockNode, stat) != null)
+ return stat.getEphemeralOwner();
+ return 0;
+ }
+
+ public long getSessionId() throws KeeperException, InterruptedException {
+ return getSessionId(getLockDataZooCache, path);
+ }
+
+ public static void deleteLock(IZooReaderWriter zk, String path) throws InterruptedException, KeeperException {
+ List<String> children;
+
+ children = zk.getChildren(path);
+
+ if (children == null || children.size() == 0) {
+ throw new IllegalStateException("No lock is held at " + path);
+ }
+
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+
+ if (!lockNode.startsWith(LOCK_PREFIX)) {
+ throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
+ }
+
+ zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.SKIP);
+
+ }
+
+ public static boolean deleteLock(IZooReaderWriter zk, String path, String lockData) throws InterruptedException, KeeperException {
+ List<String> children;
+
+ children = zk.getChildren(path);
+
+ if (children == null || children.size() == 0) {
+ throw new IllegalStateException("No lock is held at " + path);
+ }
+
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+
+ if (!lockNode.startsWith(LOCK_PREFIX)) {
+ throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
+ }
+
+ Stat stat = new Stat();
+ byte[] data = zk.getData(path + "/" + lockNode, stat);
+
+ if (lockData.equals(new String(data))) {
+ zk.recursiveDelete(path + "/" + lockNode, stat.getVersion(), NodeMissingPolicy.FAIL);
+ return true;
+ }
+
+ return false;
+ }
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock.QueueLock;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NotEmptyException;
+
+public class ZooQueueLock implements QueueLock {
+
+ private static final String PREFIX = "lock-";
+
+ // private static final Logger log = Logger.getLogger(ZooQueueLock.class);
+
+ private IZooReaderWriter zoo;
+ private String path;
+ private boolean ephemeral;
+
+ public ZooQueueLock(String zookeepers, int timeInMillis, String auth, String path, boolean ephemeral) throws KeeperException, InterruptedException {
+ this(ZooReaderWriter.getRetryingInstance(zookeepers, timeInMillis, auth), path, ephemeral);
+ }
+
+ protected ZooQueueLock(IZooReaderWriter zrw, String path, boolean ephemeral) {
+ this.zoo = zrw;
+ this.path = path;
+ this.ephemeral = ephemeral;
+ }
+
+ @Override
+ public long addEntry(byte[] data) {
+ String newPath;
+ try {
+ while (true) {
+ try {
+ if (ephemeral) {
+ newPath = zoo.putEphemeralSequential(path + "/" + PREFIX, data);
+ } else {
+ newPath = zoo.putPersistentSequential(path + "/" + PREFIX, data);
+ }
+ String[] parts = newPath.split("/");
+ String last = parts[parts.length - 1];
+ return Long.parseLong(last.substring(PREFIX.length()));
+ } catch (NoNodeException nne) {
+ // the parent does not exist so try to create it
+ zoo.putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP);
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
+ SortedMap<Long,byte[]> result = new TreeMap<Long,byte[]>();
+ try {
+ List<String> children = Collections.emptyList();
+ try {
+ children = zoo.getChildren(path);
+ } catch (KeeperException.NoNodeException ex) {
+ // the path does not exist (it was deleted or not created yet), that is ok there are no earlier entries then
+ }
+
+ for (String name : children) {
+ // this try catch must be done inside the loop because some subset of the children may exist
+ try {
+ byte[] data = zoo.getData(path + "/" + name, null);
+ long order = Long.parseLong(name.substring(PREFIX.length()));
+ if (order <= entry)
+ result.put(order, data);
+ } catch (KeeperException.NoNodeException ex) {
+ // ignored
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ return result;
+ }
+
+ @Override
+ public void removeEntry(long entry) {
+ try {
+ zoo.recursiveDelete(path + String.format("/%s%010d", PREFIX, entry), NodeMissingPolicy.SKIP);
+ try {
+ // try to delete the parent if it has no children
+ zoo.delete(path, -1);
+ } catch (NotEmptyException nee) {
+ // the path had other lock nodes, no big deal
+ } catch (NoNodeException nne) {
+ // someone else deleted the lock path
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooReader implements IZooReader {
+
+ protected String keepers;
+ protected int timeout;
+
+ protected ZooKeeper getSession(String keepers, int timeout, String auth) {
+ return ZooSession.getSession(keepers, timeout, auth);
+ }
+
+ protected ZooKeeper getZooKeeper() {
+ return getSession(keepers, timeout, null);
+ }
+
+ @Override
+ public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException {
+ return getZooKeeper().getData(zPath, false, stat);
+ }
+
+ @Override
+ public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
+ return getZooKeeper().exists(zPath, false);
+ }
+
+ @Override
+ public Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+ return getZooKeeper().exists(zPath, watcher);
+ }
+
+ @Override
+ public List<String> getChildren(String zPath) throws KeeperException, InterruptedException {
+ return getZooKeeper().getChildren(zPath, false);
+ }
+
+ @Override
+ public List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+ return getZooKeeper().getChildren(zPath, watcher);
+ }
+
+ @Override
+ public boolean exists(String zPath) throws KeeperException, InterruptedException {
+ return getZooKeeper().exists(zPath, false) != null;
+ }
+
+ @Override
+ public boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+ return getZooKeeper().exists(zPath, watcher) != null;
+ }
+
+ public ZooReader(String keepers, int timeout) {
+ this.keepers = keepers;
+ this.timeout = timeout;
+ }
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.security.SecurityPermission;
+import java.util.List;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.BadVersionException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooReaderWriter extends ZooReader implements IZooReaderWriter {
+
+ private static SecurityPermission ZOOWRITER_PERMISSION = new SecurityPermission("zookeeperWriterPermission");
+
+ private static ZooReaderWriter instance = null;
+ private static IZooReaderWriter retryingInstance = null;
+ private final String auth;
+
+ @Override
+ public ZooKeeper getZooKeeper() {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(ZOOWRITER_PERMISSION);
+ }
+ return getSession(keepers, timeout, auth);
+ }
+
+ public ZooReaderWriter(String string, int timeInMillis, String auth) {
+ super(string, timeInMillis);
+ this.auth = "accumulo:" + auth;
+ }
+
+ @Override
+ public void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
+ ZooUtil.recursiveDelete(getZooKeeper(), zPath, policy);
+ }
+
+ @Override
+ public void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
+ ZooUtil.recursiveDelete(getZooKeeper(), zPath, version, policy);
+ }
+
+ /**
+ * Create a persistent node with the default ACL
+ *
+ * @return true if the node was created or altered; false if it was skipped
+ */
+ @Override
+ public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return ZooUtil.putPersistentData(getZooKeeper(), zPath, data, policy);
+ }
+
+ @Override
+ public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return ZooUtil.putPrivatePersistentData(getZooKeeper(), zPath, data, policy);
+ }
+
+ @Override
+ public void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ ZooUtil.putPersistentData(getZooKeeper(), zPath, data, version, policy);
+ }
+
+ @Override
+ public String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
+ return ZooUtil.putPersistentSequential(getZooKeeper(), zPath, data);
+ }
+
+ @Override
+ public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
+ return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data);
+ }
+
+ @Override
+ public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ ZooUtil.recursiveCopyPersistent(getZooKeeper(), source, destination, policy);
+ }
+
+ @Override
+ public void delete(String path, int version) throws InterruptedException, KeeperException {
+ getZooKeeper().delete(path, version);
+ }
+
+ public interface Mutator {
+ byte[] mutate(byte[] currentValue) throws Exception;
+ }
+
+ @Override
+ public byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) throws Exception {
+ if (createValue != null) {
+ try {
+ getZooKeeper().create(zPath, createValue, acl, CreateMode.PERSISTENT);
+ return createValue;
+ } catch (NodeExistsException ex) {
+ // expected
+ }
+ }
+ do {
+ Stat stat = new Stat();
+ byte[] data = getZooKeeper().getData(zPath, false, stat);
+ data = mutator.mutate(data);
+ if (data == null)
+ return data;
+ try {
+ getZooKeeper().setData(zPath, data, stat.getVersion());
+ return data;
+ } catch (BadVersionException ex) {
+ //
+ }
+ } while (true);
+ }
+
+ public static synchronized ZooReaderWriter getInstance(String zookeepers, int timeInMillis, String auth) {
+ if (instance == null)
+ instance = new ZooReaderWriter(zookeepers, timeInMillis, auth);
+ return instance;
+ }
+
+ /**
+ * get an instance that retries when zookeeper connection errors occur
+ *
+ * @return an instance that retries when Zookeeper connection errors occur.
+ */
+ public static synchronized IZooReaderWriter getRetryingInstance(String zookeepers, int timeInMillis, String auth) {
+
+ if (retryingInstance == null) {
+ final IZooReaderWriter inst = getInstance(zookeepers, timeInMillis, auth);
+
+ InvocationHandler ih = new InvocationHandler() {
+ @Override
+ public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
+ long retryTime = 250;
+ while (true) {
+ try {
+ return method.invoke(inst, args);
+ } catch (InvocationTargetException e) {
+ if (e.getCause() instanceof KeeperException.ConnectionLossException) {
+ Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + retryTime, e.getCause());
+ UtilWaitThread.sleep(retryTime);
+ retryTime = Math.min(5000, retryTime + 250);
+ } else {
+ throw e.getCause();
+ }
+ }
+ }
+ }
+ };
+
+ retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
+ }
+
+ return retryingInstance;
+ }
+
+ @Override
+ public boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException {
+ return ZooUtil.isLockHeld(getZooKeeper(), lockID);
+ }
+
+ @Override
+ public void mkdirs(String path) throws KeeperException, InterruptedException {
+ if (path.equals(""))
+ return;
+ if (!path.startsWith("/"))
+ throw new IllegalArgumentException(path + "does not start with /");
+ if (getZooKeeper().exists(path, false) != null)
+ return;
+ String parent = path.substring(0, path.lastIndexOf("/"));
+ mkdirs(parent);
+ putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP);
+ }
+
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooReservation {
+
+ public static boolean attempt(IZooReaderWriter zk, String path, String reservationID, String debugInfo) throws KeeperException, InterruptedException {
+ if (reservationID.contains(":"))
+ throw new IllegalArgumentException();
+
+ while (true) {
+ try {
+ zk.putPersistentData(path, (reservationID + ":" + debugInfo).getBytes(), NodeExistsPolicy.FAIL);
+ return true;
+ } catch (NodeExistsException nee) {
+ Stat stat = new Stat();
+ byte[] zooData;
+ try {
+ zooData = zk.getData(path, stat);
+ } catch (NoNodeException nne) {
+ continue;
+ }
+
+ String idInZoo = new String(zooData).split(":")[0];
+
+ return idInZoo.equals(new String(reservationID));
+ }
+ }
+
+ }
+
+ public static void release(IZooReaderWriter zk, String path, String reservationID) throws KeeperException, InterruptedException {
+ Stat stat = new Stat();
+ byte[] zooData;
+
+ try {
+ zooData = zk.getData(path, stat);
+ } catch (NoNodeException e) {
+ // TODO log warning? this may happen as a normal course of business.... could return a boolean...
+ Logger.getLogger(ZooReservation.class).debug("Node does not exist " + path);
+ return;
+ }
+
+ String idInZoo = new String(zooData).split(":")[0];
+
+ if (!idInZoo.equals(new String(reservationID))) {
+ throw new IllegalStateException("Tried to release reservation " + path + " with data mismatch " + new String(reservationID) + " " + new String(zooData));
+ }
+
+ zk.recursiveDelete(path, stat.getVersion(), NodeMissingPolicy.SKIP);
+ }
+
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+
+class ZooSession {
+
+ private static final Logger log = Logger.getLogger(ZooSession.class);
+
+ private static class ZooSessionInfo {
+ public ZooSessionInfo(ZooKeeper zooKeeper, ZooWatcher watcher) {
+ this.zooKeeper = zooKeeper;
+ }
+
+ ZooKeeper zooKeeper;
+ }
+
+ private static Map<String,ZooSessionInfo> sessions = new HashMap<String,ZooSessionInfo>();
+
+ private static String sessionKey(String keepers, int timeout, String auth) {
+ return keepers + ":" + timeout + ":" + (auth == null ? "" : auth);
+ }
+
+ private static class ZooWatcher implements Watcher {
+
+ private HashSet<Watcher> watchers = new HashSet<Watcher>();
+
+ public void process(WatchedEvent event) {
+ // copy the watchers, in case the callback adds() more Watchers
+ // otherwise we get a ConcurrentModificationException
+ Collection<Watcher> watcherCopy = new ArrayList<Watcher>(watchers);
+
+ for (Watcher watcher : watcherCopy) {
+ watcher.process(event);
+ }
+
+ if (event.getState() == KeeperState.Expired) {
+ log.debug("Session expired, state of current session : " + event.getState());
+ }
+ }
+
+ }
+
+ public static ZooKeeper connect(String host, int timeout, String auth, Watcher watcher) {
+ final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
+ final int TOTAL_CONNECT_TIME_WAIT_MS = 10 * 1000;
+ boolean tryAgain = true;
+ int sleepTime = 100;
+ ZooKeeper zooKeeper = null;
+
+ while (tryAgain) {
+ try {
+ zooKeeper = new ZooKeeper(host, timeout, watcher);
+ // it may take some time to get connected to zookeeper if some of the servers are down
+ for (int i = 0; i < TOTAL_CONNECT_TIME_WAIT_MS / TIME_BETWEEN_CONNECT_CHECKS_MS && tryAgain; i++) {
+ if (zooKeeper.getState().equals(States.CONNECTED)) {
+ if (auth != null)
+ zooKeeper.addAuthInfo("digest", auth.getBytes());
+ tryAgain = false;
+ } else
+ UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
+ }
+ } catch (UnknownHostException uhe) {
+ // do not expect to recover from this
+ log.warn(uhe.getClass().getName() + " : " + uhe.getMessage());
+ throw new RuntimeException(uhe);
+ } catch (IOException e) {
+ log.warn("Connection to zooKeeper failed, will try again in " + String.format("%.2f secs", sleepTime / 1000.0), e);
+ } finally {
+ if (tryAgain && zooKeeper != null)
+ try {
+ zooKeeper.close();
+ zooKeeper = null;
+ } catch (InterruptedException e) {
+ log.warn("interrupted", e);
+ }
+ }
+
+ if (tryAgain) {
+ UtilWaitThread.sleep(sleepTime);
+ if (sleepTime < 10000)
+ sleepTime = (int) (sleepTime + sleepTime * Math.random());
+ }
+ }
+
+ return zooKeeper;
+ }
+
+ public static synchronized ZooKeeper getSession(String zooKeepers, int timeout) {
+ return getSession(zooKeepers, timeout, null);
+ }
+
+ public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String auth) {
+
+ String sessionKey = sessionKey(zooKeepers, timeout, auth);
+
+ // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
+ String readOnlySessionKey = sessionKey(zooKeepers, timeout, null);
+
+ ZooSessionInfo zsi = sessions.get(sessionKey);
+ if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
+ if (auth != null && sessions.get(readOnlySessionKey) == zsi)
+ sessions.remove(readOnlySessionKey);
+ zsi = null;
+ sessions.remove(sessionKey);
+ }
+
+ if (zsi == null) {
+ ZooWatcher watcher = new ZooWatcher();
+ log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth");
+ zsi = new ZooSessionInfo(connect(zooKeepers, timeout, auth, watcher), watcher);
+ sessions.put(sessionKey, zsi);
+ if (auth != null && !sessions.containsKey(readOnlySessionKey))
+ sessions.put(readOnlySessionKey, zsi);
+ }
+ return zsi.zooKeeper;
+ }
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooUtil {
+ public enum NodeExistsPolicy {
+ SKIP, OVERWRITE, FAIL
+ }
+
+ public enum NodeMissingPolicy {
+ SKIP, CREATE, FAIL
+ }
+
+ public static class LockID {
+ public long eid;
+ public String path;
+ public String node;
+
+ public LockID(String root, String serializedLID) {
+ String sa[] = serializedLID.split("\\$");
+ int lastSlash = sa[0].lastIndexOf('/');
+
+ if (sa.length != 2 || lastSlash < 0) {
+ throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
+ }
+
+ if (lastSlash == 0)
+ path = root;
+ else
+ path = root + "/" + sa[0].substring(0, lastSlash);
+ node = sa[0].substring(lastSlash + 1);
+ eid = Long.parseLong(sa[1], 16);
+ }
+
+ public LockID(String path, String node, long eid) {
+ this.path = path;
+ this.node = node;
+ this.eid = eid;
+ }
+
+ public String serialize(String root) {
+
+ return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
+ }
+
+ @Override
+ public String toString() {
+ return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
+ }
+ }
+
+ public static final List<ACL> PRIVATE;
+ public static final List<ACL> PUBLIC;
+ static {
+ PRIVATE = new ArrayList<ACL>();
+ PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
+ PUBLIC = new ArrayList<ACL>();
+ PUBLIC.addAll(PRIVATE);
+ PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
+ }
+
+ /**
+ * This method will delete a node and all its children from zookeeper
+ *
+ * @param zPath
+ * the path to delete
+ */
+ public static void recursiveDelete(ZooKeeper zk, String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
+ if (policy.equals(NodeMissingPolicy.CREATE))
+ throw new IllegalArgumentException(policy.name() + " is invalid for this operation");
+ try {
+ for (String child : zk.getChildren(zPath, false))
+ recursiveDelete(zk, zPath + "/" + child, NodeMissingPolicy.SKIP);
+
+ Stat stat;
+ if ((stat = zk.exists(zPath, null)) != null)
+ zk.delete(zPath, stat.getVersion());
+ } catch (KeeperException e) {
+ if (policy.equals(NodeMissingPolicy.SKIP) && e.code().equals(KeeperException.Code.NONODE))
+ return;
+ throw e;
+ }
+ }
+
+ public static void recursiveDelete(ZooKeeper zk, String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
+ recursiveDelete(zk, zPath, -1, policy);
+ }
+
+ /**
+ * Create a persistent node with the default ACL
+ *
+ * @return true if the node was created or altered; false if it was skipped
+ */
+ public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
+ }
+
+ public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException,
+ InterruptedException {
+ return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
+ }
+
+ public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy, List<ACL> acls)
+ throws KeeperException, InterruptedException {
+ return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
+ }
+
+ private static boolean putData(ZooKeeper zk, String zPath, byte[] data, CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls)
+ throws KeeperException, InterruptedException {
+ if (policy == null)
+ policy = NodeExistsPolicy.FAIL;
+
+ while (true) {
+ try {
+ zk.create(zPath, data, acls, mode);
+ return true;
+ } catch (NodeExistsException nee) {
+ switch (policy) {
+ case SKIP:
+ return false;
+ case OVERWRITE:
+ try {
+ zk.setData(zPath, data, version);
+ return true;
+ } catch (NoNodeException nne) {
+ // node delete between create call and set data, so try create call again
+ continue;
+ }
+ default:
+ throw nee;
+ }
+ }
+ }
+ }
+
+ public static byte[] getData(ZooKeeper zk, String zPath, Stat stat) throws KeeperException, InterruptedException {
+ return zk.getData(zPath, false, stat);
+ }
+
+ public static Stat getStatus(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
+ return zk.exists(zPath, false);
+ }
+
+ public static boolean exists(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
+ return getStatus(zk, zPath) != null;
+ }
+
+ public static void recursiveCopyPersistent(ZooKeeper zk, String source, String destination, NodeExistsPolicy policy) throws KeeperException,
+ InterruptedException {
+ Stat stat = null;
+ if (!exists(zk, source))
+ throw KeeperException.create(Code.NONODE, source);
+ if (exists(zk, destination)) {
+ switch (policy) {
+ case OVERWRITE:
+ break;
+ case SKIP:
+ return;
+ case FAIL:
+ default:
+ throw KeeperException.create(Code.NODEEXISTS, source);
+ }
+ }
+
+ stat = new Stat();
+ byte[] data = zk.getData(source, false, stat);
+ if (stat.getEphemeralOwner() == 0) {
+ if (data == null)
+ throw KeeperException.create(Code.NONODE, source);
+ putPersistentData(zk, destination, data, policy);
+ if (stat.getNumChildren() > 0)
+ for (String child : zk.getChildren(source, false))
+ recursiveCopyPersistent(zk, source + "/" + child, destination + "/" + child, policy);
+ }
+ }
+
+ public static boolean putPrivatePersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
+ }
+
+ public static String putPersistentSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
+ return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
+ }
+
+ public static String putEphemeralSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
+ return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
+ }
+
+ public static byte[] getLockData(ZooCache zc, String path) {
+
+ List<String> children = zc.getChildren(path);
+
+ if (children.size() == 0) {
+ return null;
+ }
+
+ children = new ArrayList<String>(children);
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+
+ return zc.get(path + "/" + lockNode);
+ }
+
+ public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
+
+ List<String> children = zk.getChildren(lid.path, false);
+
+ if (children.size() == 0) {
+ return false;
+ }
+
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+ if (!lid.node.equals(lockNode))
+ return false;
+
+ Stat stat = zk.exists(lid.path + "/" + lid.node, false);
+ return stat != null && stat.getEphemeralOwner() == lid.eid;
+ }
+
+}
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java (added)
+++ accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import junit.framework.Assert;
+
+import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock.QueueLock;
+import org.junit.Test;
+
+public class DistributedReadWriteLockTest {
+
+ // Non-zookeeper version of QueueLock
+ public static class MockQueueLock implements QueueLock {
+
+ long next = 0L;
+ SortedMap<Long,byte[]> locks = new TreeMap<Long,byte[]>();
+
+ @Override
+ synchronized public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
+ SortedMap<Long,byte[]> result = new TreeMap<Long,byte[]>();
+ result.putAll(locks.headMap(entry + 1));
+ return result;
+ }
+
+ @Override
+ synchronized public void removeEntry(long entry) {
+ synchronized (locks) {
+ locks.remove(entry);
+ locks.notifyAll();
+ }
+ }
+
+ @Override
+ synchronized public long addEntry(byte[] data) {
+ long result;
+ synchronized (locks) {
+ locks.put(result = next++, data);
+ locks.notifyAll();
+ }
+ return result;
+ }
+ }
+
+ // some data that is probably not going to update atomically
+ static class SomeData {
+ volatile int[] data = new int[100];
+ volatile int counter;
+
+ void read() {
+ for (int i = 0; i < data.length; i++)
+ Assert.assertEquals(counter, data[i]);
+ }
+
+ void write() {
+ ++counter;
+ for (int i = data.length - 1; i >= 0; i--)
+ data[i] = counter;
+ }
+ }
+
+ @Test
+ public void testLock() throws Exception {
+ final SomeData data = new SomeData();
+ data.write();
+ data.read();
+ QueueLock qlock = new MockQueueLock();
+
+ final ReadWriteLock locker = new DistributedReadWriteLock(qlock, "locker1".getBytes());
+ Lock readLock = locker.readLock();
+ Lock writeLock = locker.writeLock();
+ readLock.lock();
+ readLock.unlock();
+ writeLock.lock();
+ writeLock.unlock();
+ readLock.lock();
+ readLock.unlock();
+
+ // do a bunch of reads/writes in separate threads, look for inconsistent updates
+ Thread[] threads = new Thread[2];
+ for (int i = 0; i < threads.length; i++) {
+ final int which = i;
+ threads[i] = new Thread() {
+ public void run() {
+ if (which % 2 == 0) {
+ Lock wl = locker.writeLock();
+ wl.lock();
+ try {
+ data.write();
+ } finally {
+ wl.unlock();
+ }
+ } else {
+ Lock rl = locker.readLock();
+ rl.lock();
+ data.read();
+ try {
+ data.read();
+ } finally {
+ rl.unlock();
+ }
+ }
+ }
+ };
+ }
+ for (Thread t : threads) {
+ t.start();
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ }
+
+}
Propchange: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java (added)
+++ accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TransactionWatcherTest {
+
+ static class SimpleArbitrator implements TransactionWatcher.Arbitrator {
+ Map<String,List<Long>> map = new HashMap<String,List<Long>>();
+
+ public synchronized void start(String txType, Long txid) throws Exception {
+ List<Long> txids = map.get(txType);
+ if (txids == null)
+ txids = new ArrayList<Long>();
+ if (txids.contains(txid))
+ throw new Exception("transaction already started");
+ txids.add(txid);
+ map.put(txType, txids);
+ }
+
+ public synchronized void stop(String txType, Long txid) throws Exception {
+ List<Long> txids = map.get(txType);
+ if (txids != null && txids.contains(txid)) {
+ txids.remove(txids.indexOf(txid));
+ return;
+ }
+ throw new Exception("transaction does not exist");
+ }
+
+ @Override
+ synchronized public boolean transactionAlive(String txType, long tid) throws Exception {
+ List<Long> txids = map.get(txType);
+ if (txids == null)
+ return false;
+ return txids.contains(tid);
+ }
+
+ }
+
+ @Test
+ public void testTransactionWatcher() throws Exception {
+ final String txType = "someName";
+ final long txid = 7;
+ final SimpleArbitrator sa = new SimpleArbitrator();
+ final TransactionWatcher txw = new TransactionWatcher(sa);
+ sa.start(txType, txid);
+ try {
+ sa.start(txType, txid);
+ Assert.fail("simple arbitrator did not throw an exception");
+ } catch (Exception ex) {
+ // expected
+ }
+ txw.isActive(txid);
+ Assert.assertFalse(txw.isActive(txid));
+ txw.run(txType, txid, new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ Assert.assertTrue(txw.isActive(txid));
+ return null;
+ }
+ });
+ Assert.assertFalse(txw.isActive(txid));
+ sa.stop(txType, txid);
+ Assert.assertFalse(sa.transactionAlive(txType, txid));
+ try {
+ txw.run(txType, txid, new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ Assert.fail("Should not be able to start a new work on a discontinued transaction");
+ return null;
+ }
+ });
+ Assert.fail("work against stopped transaction should fail");
+ } catch (Exception ex) {
+ ;
+ }
+ final long txid2 = 9;
+ sa.start(txType, txid2);
+ txw.run(txType, txid2, new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ Assert.assertTrue(txw.isActive(txid2));
+ sa.stop(txType, txid2);
+ try {
+ txw.run(txType, txid2, new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ Assert.fail("Should not be able to start a new work on a discontinued transaction");
+ return null;
+ }
+ });
+ Assert.fail("work against a stopped transaction should fail");
+ } catch (Exception ex) {
+ // expected
+ }
+ Assert.assertTrue(txw.isActive(txid2));
+ return null;
+ }
+ });
+
+ }
+
+}
Propchange: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: accumulo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/trunk/pom.xml?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/pom.xml (original)
+++ accumulo/trunk/pom.xml Fri Jun 29 17:42:35 2012
@@ -48,6 +48,7 @@
<modules>
<module>trace</module>
<module>core</module>
+ <module>fate</module>
<module>server</module>
<module>start</module>
<module>examples</module>
@@ -528,6 +529,11 @@
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-fate</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-start</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
Modified: accumulo/trunk/server/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/pom.xml?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/server/pom.xml (original)
+++ accumulo/trunk/server/pom.xml Fri Jun 29 17:42:35 2012
@@ -56,6 +56,10 @@
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-fate</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
</dependency>
<dependency>
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java Fri Jun 29 17:42:35 2012
@@ -35,8 +35,8 @@ import org.apache.accumulo.core.util.Byt
import org.apache.accumulo.core.util.OpTimer;
import org.apache.accumulo.core.util.StringUtil;
import org.apache.accumulo.core.util.TextUtil;
-import org.apache.accumulo.core.zookeeper.ZooCache;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.zookeeper.ZooLock;
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java Fri Jun 29 17:42:35 2012
@@ -30,8 +30,8 @@ import org.apache.accumulo.core.client.I
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationObserver;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.zookeeper.ZooCache;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.log4j.Logger;
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java Fri Jun 29 17:42:35 2012
@@ -29,8 +29,8 @@ import org.apache.accumulo.core.client.I
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.zookeeper.ZooCache;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance.AccumuloNotInitializedException;
import org.apache.log4j.Logger;