You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2013/06/28 02:23:52 UTC
svn commit: r1497620 - in /accumulo/branches/ACCUMULO-CURATOR:
fate/src/main/java/org/apache/accumulo/fate/curator/
fate/src/main/java/org/apache/accumulo/fate/zookeeper/
server/src/main/java/org/apache/accumulo/server/curator/
Author: vines
Date: Fri Jun 28 00:23:52 2013
New Revision: 1497620
URL: http://svn.apache.org/r1497620
Log:
And some other files...
Added:
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java (with props)
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java (with props)
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java (with props)
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java (with props)
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java (with props)
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java (with props)
accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/curator/
accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java (with props)
Removed:
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
Added: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java?rev=1497620&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java (added)
+++ accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java Fri Jun 28 00:23:52 2013
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.log4j.Logger;
+
+/**
+ * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
+ *
+ */
+public class CuratorCaches {
+ private static final Logger log = Logger.getLogger(CuratorCaches.class);
+
+ private HashMap<String,NodeCache> nodeCache;
+ private HashMap<String,PathChildrenCache> childrenCache;
+
+ private CuratorFramework curator;
+
+ public CuratorCaches(String zooKeepers, int sessionTimeout) {
+ this(CuratorSession.getSession(zooKeepers, sessionTimeout));
+ }
+
+ public CuratorCaches(CuratorFramework curator) {
+ this.curator = curator;
+ this.nodeCache = new HashMap<String,NodeCache>();
+ this.childrenCache = new HashMap<String,PathChildrenCache>();
+ }
+
+ public synchronized List<ChildData> getChildren(final String zPath) {
+ return getChildren(zPath, null);
+ }
+
+ public synchronized List<ChildData> getChildren(String zPath, PathChildrenCacheListener listener) {
+ PathChildrenCache cache = childrenCache.get(zPath);
+ if (cache == null) {
+ cache = new PathChildrenCache(curator, zPath, true);
+ if (listener != null) {
+ cache.getListenable().addListener(listener);
+ }
+ try {
+ log.debug("Starting cache against " + zPath + (listener!=null? " using listener " + listener:""));
+ cache.start(StartMode.BUILD_INITIAL_CACHE);
+ // I'll do it myself!
+ if (listener != null)
+ for (ChildData cd : cache.getCurrentData()) {
+ listener.childEvent(curator, new PathChildrenCacheEvent(Type.INITIALIZED, cd));
+ }
+
+ // Because parent's children are being watched, we don't need to cache the individual node
+ // UNLESS we have a listener on it
+ for (ChildData child : cache.getCurrentData()) {
+ NodeCache childCache = nodeCache.get(child.getPath());
+ if (childCache != null && childCache.getListenable().size() == 0) {
+ log.debug("Removing cache " + childCache.getCurrentData().getPath() + " because parent cache was added");
+ childCache.close();
+ nodeCache.remove(child.getPath());
+ }
+ }
+ } catch (Exception e) {
+ log.error(e, e);
+ try {
+ cache.close();
+ } catch (IOException e1) {
+ // We're already in a bad state at this point, I think, but just in case
+ log.error(e, e);
+ }
+ return null;
+ }
+ childrenCache.put(zPath, cache);
+ } else if (listener != null) {
+ log.debug("LISTENER- cache is null for path " + zPath + ", but got listener " + listener.getClass() + ". this is a broken case!");
+ }
+ return cache.getCurrentData();
+ }
+
+ public List<String> getChildKeys(final String zPath) {
+ List<String> toRet = new ArrayList<String>();
+ for (ChildData child : getChildren(zPath)) {
+ toRet.add(CuratorUtil.getNodeName(child));
+ }
+ return toRet;
+ }
+
+ public synchronized ChildData get(final String zPath) {
+ NodeCache cache = nodeCache.get(zPath);
+ if (cache == null) {
+ PathChildrenCache cCache = childrenCache.get(CuratorUtil.getNodeParent(zPath));
+ if (cCache != null) {
+ return cCache.getCurrentData(zPath);
+ }
+ cache = new NodeCache(curator, zPath);
+ try {
+ cache.start(true);
+ } catch (Exception e) {
+ log.error(e, e);
+ try {
+ cache.close();
+ } catch (IOException e1) {
+ // We're already in a bad state at this point, I think, but just in case
+ log.error(e, e);
+ }
+ return null;
+ }
+ nodeCache.put(zPath, cache);
+ }
+
+ return cache.getCurrentData();
+ }
+
+ private synchronized void remove(String zPath) {
+ if (log.isTraceEnabled())
+ log.trace("removing " + zPath + " from cache");
+ NodeCache nc = nodeCache.get(zPath);
+ if (nc != null) {
+ try {
+ nc.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ }
+
+ PathChildrenCache pc = childrenCache.get(zPath);
+ if (pc != null) {
+ try {
+ pc.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ }
+
+ nodeCache.remove(zPath);
+ childrenCache.remove(zPath);
+ }
+
+ public synchronized void clear() {
+ for (NodeCache nc : nodeCache.values()) {
+ try {
+ nc.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ }
+ for (PathChildrenCache pc : childrenCache.values()) {
+ try {
+ pc.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ }
+
+ nodeCache.clear();
+ childrenCache.clear();
+ }
+
+ public CuratorFramework getCurator() {
+ return curator;
+ }
+
+ public synchronized void clear(String zPath) {
+ List<String> pathsToRemove = new ArrayList<String>();
+ for (Iterator<String> i = nodeCache.keySet().iterator(); i.hasNext();) {
+ String path = i.next();
+ if (path.startsWith(zPath))
+ pathsToRemove.add(path);
+ }
+
+ for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) {
+ String path = i.next();
+ if (path.startsWith(zPath))
+ pathsToRemove.add(path);
+ }
+
+ for (String path : pathsToRemove)
+ remove(path);
+ }
+
+ private static Map<String,CuratorCaches> instances = new HashMap<String,CuratorCaches>();
+
+ public static synchronized CuratorCaches getInstance(String zooKeepers, int sessionTimeout) {
+ String key = zooKeepers + ":" + sessionTimeout;
+ CuratorCaches zc = instances.get(key);
+ if (zc == null) {
+ zc = new CuratorCaches(zooKeepers, sessionTimeout);
+ instances.put(key, zc);
+ }
+
+ return zc;
+ }
+}
Propchange: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java?rev=1497620&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java (added)
+++ accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java Fri Jun 28 00:23:52 2013
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+
+/**
+ * Simple wrapper to deal with exceptions from Curator which are not KeeperExceptions and IterruptedExceptions
+ */
+public class CuratorException extends Exception {
+ Exception e;
+ public CuratorException(Exception e) {
+ super(e);
+ this.e = e;
+ }
+
+ private static final long serialVersionUID = 4874809817861147889L;
+
+ public String toString() {
+ return this.getClass().getName() + ": " + e.toString();
+ }
+}
Propchange: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java?rev=1497620&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java (added)
+++ accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java Fri Jun 28 00:23:52 2013
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class CuratorReader {
+
+ protected String keepers;
+ protected int timeout;
+
+ private CuratorFramework curator;
+
+ public CuratorReader(String zooKeepers, int sessionTimeout) {
+ this(CuratorSession.getSession(zooKeepers, sessionTimeout));
+ }
+
+ public CuratorReader(String zooKeepers, int sessionTimeout, String scheme, byte[] auth) {
+ this(CuratorSession.getSession(zooKeepers, sessionTimeout, scheme, auth));
+ }
+
+ public CuratorReader(CuratorFramework curator) {
+ this.curator = curator;
+ }
+
+ @Deprecated
+ public CuratorFramework getCurator() {
+ return curator;
+ }
+
+ public byte[] getData(String zPath) throws KeeperException, InterruptedException {
+ try {
+ return getCurator().getData().forPath(zPath);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException {
+ try {
+ return getCurator().getData().storingStatIn(stat).forPath(zPath);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
+ try {
+ return getCurator().checkExists().forPath(zPath);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ public Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+ try {
+ return getCurator().checkExists().usingWatcher(watcher).forPath(zPath);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ public List<String> getChildren(String zPath) throws KeeperException, InterruptedException {
+ try {
+ return getCurator().getChildren().forPath(zPath);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ public List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+ try {
+ return getCurator().getChildren().usingWatcher(watcher).forPath(zPath);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ public boolean exists(String zPath) throws KeeperException, InterruptedException {
+ return getStatus(zPath) != null;
+ }
+
+ public boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+ return getStatus(zPath, watcher) != null;
+ }
+
+ public void sync(final String path) throws KeeperException, InterruptedException {
+ try {
+ getCurator().sync().forPath(path);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ public List<ACL> getACL(String zPath) throws KeeperException, InterruptedException {
+ try {
+ return getCurator().getACL().forPath(zPath);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+}
Propchange: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java?rev=1497620&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java (added)
+++ accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java Fri Jun 28 00:23:52 2013
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.security.SecurityPermission;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class CuratorReaderWriter extends CuratorReader {
+ private static SecurityPermission ZOOWRITER_PERMISSION = new SecurityPermission("zookeeperWriterPermission");
+
+ protected CuratorReaderWriter(String zooKeepers, int sessionTimeout, String scheme, byte[] auth) {
+ super(constructCurator(zooKeepers, sessionTimeout, scheme, auth));
+ }
+
+ private static CuratorReaderWriter instance = null;
+
+ public static synchronized CuratorReaderWriter getInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) {
+ if (instance == null)
+ instance = new CuratorReaderWriter(zookeepers, timeInMillis, scheme, auth);
+ return instance;
+ }
+
+ private static CuratorFramework constructCurator(String zooKeepers, int sessionTimeout, String scheme, byte[] auth) {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(ZOOWRITER_PERMISSION);
+ }
+ return CuratorSession.getSession(zooKeepers, sessionTimeout, scheme, auth);
+ }
+
+ public void recursiveDelete(String zPath) throws KeeperException, InterruptedException {
+ recursiveDelete(zPath, -1);
+ }
+
+ @Deprecated
+ public void recursiveDelete(String zPath, int version) throws KeeperException, InterruptedException {
+ CuratorUtil.recursiveDelete(getCurator(), zPath, version);
+ }
+
+ public static final List<ACL> PRIVATE;
+ private static final List<ACL> PUBLIC;
+ static {
+ PRIVATE = new ArrayList<ACL>();
+ PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
+ PUBLIC = new ArrayList<ACL>();
+ PUBLIC.addAll(PRIVATE);
+ PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
+ }
+
+ public enum NodeExistsPolicy {
+ SKIP, OVERWRITE, FAIL, SEQUENTIAL
+ }
+
+ private String putData(String zPath, byte[] data, CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls) throws KeeperException,
+ InterruptedException {
+ if (policy == null)
+ policy = NodeExistsPolicy.FAIL;
+
+ CuratorFramework curator = getCurator();
+ try {
+ boolean exists = curator.checkExists().forPath(zPath) != null;
+
+
+ if (!exists || policy.equals(NodeExistsPolicy.SEQUENTIAL)) {
+ return curator.create().withMode(mode).withACL(acls).forPath(zPath, data);
+ }
+ else if (policy.equals(NodeExistsPolicy.OVERWRITE)) {
+ curator.setData().withVersion(version).forPath(zPath, data);
+ return zPath;
+ }
+ return null;
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ /**
+ * Create a persistent node with the default ACL
+ *
+ * @return true if the node was created or altered; false if it was skipped
+ */
+ public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return putPersistentData(zPath, data, -1, policy);
+ }
+
+ public boolean putPersistentDataWithACL(String zPath, byte[] data, NodeExistsPolicy policy, List<ACL> acls)
+ throws KeeperException, InterruptedException {
+ return putData(zPath, data, CreateMode.PERSISTENT, -1, policy, acls) != null;
+ }
+
+ public boolean putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return putData(zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC) != null;
+ }
+
+ public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ return putData(zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE) != null;
+ }
+
+ public String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
+ return putData(zPath, data, CreateMode.PERSISTENT_SEQUENTIAL, -1, NodeExistsPolicy.SEQUENTIAL, PUBLIC);
+ }
+
+ public boolean putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException {
+ return putData(zPath, data, CreateMode.EPHEMERAL, -1, null, PUBLIC) != null;
+ }
+
+ public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
+ return putData(zPath, data, CreateMode.EPHEMERAL_SEQUENTIAL, -1, NodeExistsPolicy.SEQUENTIAL, PUBLIC);
+ }
+
+ public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+ Stat stat = null;
+ if (!exists(source))
+ throw KeeperException.create(KeeperException.Code.NONODE, source);
+ if (exists(destination)) {
+ switch (policy) {
+ case OVERWRITE:
+ break;
+ case SKIP:
+ case SEQUENTIAL:
+ return;
+ case FAIL:
+ default:
+ throw KeeperException.create(KeeperException.Code.NODEEXISTS, source);
+ }
+ }
+
+ stat = new Stat();
+ byte[] data = getData(source, stat);
+ if (stat.getEphemeralOwner() == 0) {
+ if (data == null)
+ throw KeeperException.create(KeeperException.Code.NONODE, source);
+ putPersistentData(destination, data, policy);
+ if (stat.getNumChildren() > 0)
+ for (String child : getChildren(source))
+ recursiveCopyPersistent(source + "/" + child, destination + "/" + child, policy);
+ }
+ }
+
+ public void delete(String path, int version) throws InterruptedException, KeeperException {
+ try {
+ getCurator().delete().withVersion(version).forPath(path);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ public interface Mutator {
+ byte[] mutate(byte[] currentValue) throws Exception;
+ }
+
+ public byte[] mutate(String zPath, byte[] createValue, boolean privateACL, Mutator mutator) throws Exception {
+ if (createValue != null) {
+ byte[] data = getData(zPath);
+ if (data == null) {
+ if (privateACL)
+ putPrivatePersistentData(zPath, createValue, NodeExistsPolicy.FAIL);
+ else
+ putPersistentData(zPath, createValue, NodeExistsPolicy.FAIL);
+ return createValue;
+ }
+ }
+
+ Stat stat = new Stat();
+ byte[] data = getData(zPath, stat);
+ data = mutator.mutate(data);
+ if (data == null)
+ return data;
+ if (privateACL)
+ putPrivatePersistentData(zPath, createValue, NodeExistsPolicy.OVERWRITE);
+ else
+ putPersistentData(zPath, createValue, NodeExistsPolicy.OVERWRITE);
+ return data;
+ }
+
+ public boolean isLockHeld(CuratorUtil.LockID lockID) throws KeeperException, InterruptedException {
+ try {
+ return CuratorUtil.isLockHeld(getCurator().getZookeeperClient().getZooKeeper(), lockID);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+
+ public void mkdirs(String path) throws KeeperException, InterruptedException {
+ try {
+ getCurator().create().creatingParentsIfNeeded().forPath(path);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ }
+}
Propchange: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java?rev=1497620&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java (added)
+++ accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java Fri Jun 28 00:23:52 2013
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.log4j.Logger;
+
+class CuratorSession {
+
+ private static final Logger log = Logger.getLogger(CuratorSession.class);
+
+ private static RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
+
+ private static Map<String,CuratorFramework> sessions = new HashMap<String,CuratorFramework>();
+
+ private static String sessionKey(String keepers, int timeout, String scheme, byte[] auth) {
+ return keepers + ":" + timeout + ":" + (scheme == null ? "" : scheme) + ":" + (auth == null ? "" : new String(auth));
+ }
+
+ private static CuratorFramework constructCurator(String zookeeperConnectString, int sessionTimeoutMs, String namespace, String scheme, byte[] bytes) {
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().canBeReadOnly(true).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(retry)
+ .connectString(zookeeperConnectString);
+ if (scheme != null && bytes != null)
+ builder = builder.authorization(scheme, bytes);
+ if (namespace != null)
+ builder = builder.namespace(namespace);
+
+ CuratorFramework toRet = builder.build();
+ toRet.start();
+ return toRet;
+ }
+
+ public static synchronized CuratorFramework getSession(String zooKeepers, int timeout) {
+ return getSession(zooKeepers, timeout, null, null);
+ }
+
+ public static synchronized CuratorFramework getSession(String zooKeepers, int timeout, String scheme, byte[] auth) {
+
+ String sessionKey = sessionKey(zooKeepers, timeout, scheme, auth);
+
+ // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
+ String readOnlySessionKey = sessionKey(zooKeepers, timeout, null, null);
+ CuratorFramework curator = sessions.get(sessionKey);
+ if (curator != null && curator.getState() == CuratorFrameworkState.STOPPED) {
+ if (auth != null && sessions.get(readOnlySessionKey) == curator)
+ sessions.remove(readOnlySessionKey);
+ curator = null;
+ sessions.remove(sessionKey);
+ }
+
+ if (curator == null) {
+ log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth " + (auth==null? "null":new String(auth)));
+ curator = constructCurator(zooKeepers, timeout, null, scheme, auth);
+ sessions.put(sessionKey, curator);
+ if (auth != null && !sessions.containsKey(readOnlySessionKey))
+ sessions.put(readOnlySessionKey, curator);
+ }
+ return curator;
+ }
+}
Propchange: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java?rev=1497620&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java (added)
+++ accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java Fri Jun 28 00:23:52 2013
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.Stat;
+
+public class CuratorUtil {
+ public enum NodeMissingPolicy {
+ SKIP, CREATE, FAIL
+ }
+
+ public static String getNodeName(ChildData node) {
+ return getNodeName(node.getPath());
+ }
+
+ public static String getNodeName(String nodePath) {
+ return new File(nodePath).getName();
+ }
+
+ public static String getNodeParent(ChildData node) {
+ return getNodeParent(node.getPath());
+ }
+
+ public static String getNodeParent(String nodePath) {
+ return new File(nodePath).getParent();
+ }
+
+ public static void recursiveDelete(CuratorFramework curator, final String pathRoot, int version) throws KeeperException, InterruptedException {
+ PathUtils.validatePath(pathRoot);
+
+ List<String> tree = listSubTreeBFS(curator, pathRoot);
+ for (int i = tree.size() - 1; i >= 0; --i) {
+ // Delete the leaves first and eventually get rid of the root
+ try {
+ curator.delete().withVersion(version).forPath(tree.get(i));
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ } // Delete all versions of the node
+ }
+ }
+
+ private static List<String> listSubTreeBFS(CuratorFramework curator, final String pathRoot) throws KeeperException, InterruptedException {
+ Deque<String> queue = new LinkedList<String>();
+ List<String> tree = new ArrayList<String>();
+ queue.add(pathRoot);
+ tree.add(pathRoot);
+ while (true) {
+ String node = queue.pollFirst();
+ if (node == null) {
+ break;
+ }
+ List<String> children;
+ try {
+ children = curator.getChildren().forPath(node);
+ } catch (Exception e) {
+ throw CuratorUtil.manageException(e);
+ }
+ for (final String child : children) {
+ final String childPath = node + "/" + child;
+ queue.add(childPath);
+ tree.add(childPath);
+ }
+ }
+ return tree;
+ }
+
+ public static class LockID {
+ public long eid;
+ public String path;
+ public String node;
+
+ public LockID(String root, String serializedLID) {
+ String sa[] = serializedLID.split("\\$");
+ int lastSlash = sa[0].lastIndexOf('/');
+
+ if (sa.length != 2 || lastSlash < 0) {
+ throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
+ }
+
+ if (lastSlash == 0)
+ path = root;
+ else
+ path = root + "/" + sa[0].substring(0, lastSlash);
+ node = sa[0].substring(lastSlash + 1);
+ eid = new BigInteger(sa[1], 16).longValue();
+ }
+
+ public LockID(String path, String node, long eid) {
+ this.path = path;
+ this.node = node;
+ this.eid = eid;
+ }
+
+ public String serialize(String root) {
+
+ return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
+ }
+
+ @Override
+ public String toString() {
+ return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
+ }
+ }
+
+ public static byte[] getLockData(CuratorCaches zc, String path) {
+
+ List<ChildData> children = zc.getChildren(path);
+
+ if (children == null || children.size() == 0) {
+ return null;
+ }
+
+ children = new ArrayList<ChildData>(children);
+ Collections.sort(children);
+
+ return children.get(0).getData();
+ }
+
+ public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
+ while (true) {
+ try {
+ List<String> children = zk.getChildren(lid.path, false);
+
+ if (children.size() == 0) {
+ return false;
+ }
+
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+ if (!lid.node.equals(lockNode))
+ return false;
+
+ Stat stat = zk.exists(lid.path + "/" + lid.node, false);
+ return stat != null && stat.getEphemeralOwner() == lid.eid;
+ } catch (KeeperException.ConnectionLossException ex) {
+ UtilWaitThread.sleep(1000);
+ }
+ }
+ }
+
+ /**
+ * Fluffer class. Right now keep it generic but as I probe Curator I can make exceptions better
+ */
+ public static RuntimeException manageException(Exception e) throws KeeperException, InterruptedException {
+ if (e instanceof KeeperException)
+ throw (KeeperException) e;
+ if (e instanceof InterruptedException)
+ throw (InterruptedException) e;
+ return new RuntimeException(e);
+ }
+}
Propchange: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java?rev=1497620&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java (added)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java Fri Jun 28 00:23:52 2013
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.curator;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+
+public class CuratorReaderWriter extends org.apache.accumulo.fate.curator.CuratorReaderWriter {
+ private static final String SCHEME = "digest";
+ private static final String USER = "accumulo";
+ private static CuratorReaderWriter instance = null;
+
+ private CuratorReaderWriter(String string, int timeInMillis, String secret) {
+ super(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes());
+ }
+
+ public static CuratorReaderWriter getInstance() {
+ AccumuloConfiguration conf = ServerConfiguration.getSiteConfiguration();
+ return getInstance(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), conf.get(Property.INSTANCE_SECRET));
+ }
+
+ public static CuratorReaderWriter getInstance(String secret) {
+ AccumuloConfiguration conf = ServerConfiguration.getSiteConfiguration();
+ return getInstance(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), secret);
+ }
+
+ public static synchronized CuratorReaderWriter getInstance(String zkHosts, int timeout, String secret) {
+ if (instance == null) {
+ instance = new CuratorReaderWriter(zkHosts, timeout, secret);
+ }
+ return instance;
+ }
+}
Propchange: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java
------------------------------------------------------------------------------
svn:eol-style = native