You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bh...@apache.org on 2014/06/06 22:12:27 UTC
[1/3] git commit: ACCUMULO-2865 Add ZooCacheTest
Repository: accumulo
Updated Branches:
refs/heads/1.6.1-SNAPSHOT 18d6ca1bf -> 4950870de
refs/heads/master 809b73c05 -> 7749cbc7b
ACCUMULO-2865 Add ZooCacheTest
This adds a unit test for ZooCache. ZooCache itself was only changed slightly, with the
addition of methods to check if information was cached and improvements to its comments.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4950870d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4950870d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4950870d
Branch: refs/heads/1.6.1-SNAPSHOT
Commit: 4950870de6dcf26dc1aa78f63e39da513a0d92de
Parents: 18d6ca1
Author: Bill Havanki <bh...@cloudera.com>
Authored: Fri Jun 6 12:33:21 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Fri Jun 6 14:13:50 2014 -0400
----------------------------------------------------------------------
.../accumulo/fate/zookeeper/ZooCache.java | 85 ++++-
.../accumulo/fate/zookeeper/ZooCacheTest.java | 345 +++++++++++++++++++
2 files changed, 423 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4950870d/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index 99ffd04..d9eb243 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.fate.zookeeper;
+import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -27,7 +28,6 @@ import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
@@ -38,8 +38,8 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
- * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
- *
+ * A cache for values stored in ZooKeeper. Values are kept up to date as they
+ * change.
*/
public class ZooCache {
private static final Logger log = Logger.getLogger(ZooCache.class);
@@ -100,14 +100,35 @@ public class ZooCache {
}
}
+ /**
+ * Creates a new cache.
+ *
+ * @param zooKeepers comma-separated list of ZooKeeper host[:port]s
+ * @param sessionTimeout ZooKeeper session timeout
+ */
public ZooCache(String zooKeepers, int sessionTimeout) {
this(zooKeepers, sessionTimeout, null);
}
+ /**
+ * Creates a new cache. The given watcher is called whenever a watched node
+ * changes.
+ *
+ * @param zooKeepers comma-separated list of ZooKeeper host[:port]s
+ * @param sessionTimeout ZooKeeper session timeout
+ * @param watcher watcher object
+ */
public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
this(new ZooReader(zooKeepers, sessionTimeout), watcher);
}
+ /**
+ * Creates a new cache. The given watcher is called whenever a watched node
+ * changes.
+ *
+ * @param reader ZooKeeper reader
+ * @param watcher watcher object
+ */
public ZooCache(ZooReader reader, Watcher watcher) {
this.zReader = reader;
this.cache = new HashMap<String,byte[]>();
@@ -134,7 +155,7 @@ public class ZooCache {
} catch (KeeperException e) {
if (e.code() == Code.NONODE) {
- log.error("Looked up non existant node in cache " + e.getPath(), e);
+ log.error("Looked up non-existent node in cache " + e.getPath(), e);
}
log.warn("Zookeeper error, will retry", e);
} catch (InterruptedException e) {
@@ -155,6 +176,12 @@ public class ZooCache {
}
}
+ /**
+ * Gets the children of the given node. A watch is established by this call.
+ *
+ * @param zPath path of node
+ * @return children list, or null if node has no children or does not exist
+ */
public synchronized List<String> getChildren(final String zPath) {
ZooRunnable zr = new ZooRunnable() {
@@ -186,10 +213,25 @@ public class ZooCache {
return Collections.unmodifiableList(children);
}
+ /**
+ * Gets data at the given path. Status information is not returned. A watch is
+ * established by this call.
+ *
+ * @param zPath path to get
+ * @return path data, or null if non-existent
+ */
public synchronized byte[] get(final String zPath) {
return get(zPath, null);
}
+ /**
+ * Gets data at the given path, filling status information into the given
+ * <code>Stat</code> object. A watch is established by this call.
+ *
+ * @param zPath path to get
+ * @param stat status object to populate
+ * @return path data, or null if non-existent
+ */
public synchronized byte[] get(final String zPath, Stat stat) {
ZooRunnable zr = new ZooRunnable() {
@@ -200,11 +242,11 @@ public class ZooCache {
return;
/*
- * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existance, it will be added to
- * the cache. But this notification of a node coming into existance will only be given if exists() was previously called.
+ * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existence, it will be added to
+ * the cache. But this notification of a node coming into existence will only be given if exists() was previously called.
*
* If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then
- * non-existance can not be cached.
+ * non-existence can not be cached.
*/
Stat stat = zooKeeper.exists(zPath, watcher);
@@ -270,12 +312,41 @@ public class ZooCache {
statCache.remove(zPath);
}
+ /**
+ * Clears this cache.
+ */
public synchronized void clear() {
cache.clear();
childrenCache.clear();
statCache.clear();
}
+ /**
+ * Checks if a data value (or lack of one) is cached.
+ *
+ * @param zPath path of node
+ * @return true if data value is cached
+ */
+ @VisibleForTesting
+ synchronized boolean dataCached(String zPath) {
+ return cache.containsKey(zPath);
+ }
+ /**
+ * Checks if children of a node (or lack of them) are cached.
+ *
+ * @param zPath path of node
+ * @return true if children are cached
+ */
+ @VisibleForTesting
+ synchronized boolean childrenCached(String zPath) {
+ return childrenCache.containsKey(zPath);
+ }
+
+ /**
+ * Clears this cache of all information about nodes rooted at the given path.
+ *
+ * @param zPath path of top node
+ */
public synchronized void clear(String zPath) {
for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4950870d/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
new file mode 100644
index 0000000..e3db785
--- /dev/null
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.List;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import org.easymock.Capture;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createStrictMock;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class ZooCacheTest {
+ private static final String ZPATH = "/some/path/in/zk";
+ private static final byte[] DATA = {(byte) 1, (byte) 2, (byte) 3, (byte) 4};
+ private static final List<String> CHILDREN = java.util.Arrays.asList(new String[] {"huey", "dewey", "louie"});
+
+ private ZooReader zr;
+ private ZooKeeper zk;
+ private ZooCache zc;
+
+ @Before
+ public void setUp() throws Exception {
+ zr = createMock(ZooReader.class);
+ zk = createStrictMock(ZooKeeper.class);
+ expect(zr.getZooKeeper()).andReturn(zk);
+ expectLastCall().anyTimes();
+ replay(zr);
+
+ zc = new ZooCache(zr, null);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ testGet(false);
+ }
+
+ @Test
+ public void testGet_FillStat() throws Exception {
+ testGet(true);
+ }
+
+ private void testGet(boolean fillStat) throws Exception {
+ Stat myStat = null;
+ if (fillStat) {
+ myStat = new Stat();
+ }
+ long now = System.currentTimeMillis();
+ Stat existsStat = new Stat();
+ existsStat.setMtime(now);
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA);
+ replay(zk);
+
+ assertFalse(zc.dataCached(ZPATH));
+ assertArrayEquals(DATA, (fillStat ? zc.get(ZPATH, myStat) : zc.get(ZPATH)));
+ verify(zk);
+ if (fillStat) {
+ assertEquals(now, myStat.getMtime());
+ }
+
+ assertTrue(zc.dataCached(ZPATH));
+ assertSame(DATA, zc.get(ZPATH)); // cache hit
+ }
+
+ @Test
+ public void testGet_NonExistent() throws Exception {
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(null);
+ replay(zk);
+
+ assertNull(zc.get(ZPATH));
+ verify(zk);
+ }
+
+ @Test
+ public void testGet_Retry_NoNode() throws Exception {
+ testGet_Retry(new KeeperException.NoNodeException(ZPATH));
+ }
+
+ @Test
+ public void testGet_Retry_ConnectionLoss() throws Exception {
+ testGet_Retry(new KeeperException.ConnectionLossException());
+ }
+
+ @Test
+ public void testGet_Retry_BadVersion() throws Exception {
+ testGet_Retry(new KeeperException.BadVersionException(ZPATH));
+ }
+
+ @Test
+ public void testGet_Retry_Interrupted() throws Exception {
+ testGet_Retry(new InterruptedException());
+ }
+
+ private void testGet_Retry(Exception e) throws Exception {
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andThrow(e);
+ Stat existsStat = new Stat();
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA);
+ replay(zk);
+
+ assertArrayEquals(DATA, zc.get(ZPATH));
+ verify(zk);
+ }
+
+ @Test
+ public void testGet_Retry2_NoNode() throws Exception {
+ testGet_Retry2(new KeeperException.NoNodeException(ZPATH));
+ }
+
+ @Test
+ public void testGet_Retry2_ConnectionLoss() throws Exception {
+ testGet_Retry2(new KeeperException.ConnectionLossException());
+ }
+
+ @Test
+ public void testGet_Retry2_BadVersion() throws Exception {
+ testGet_Retry2(new KeeperException.BadVersionException(ZPATH));
+ }
+
+ @Test
+ public void testGet_Retry2_Interrupted() throws Exception {
+ testGet_Retry2(new InterruptedException());
+ }
+
+ private void testGet_Retry2(Exception e) throws Exception {
+ Stat existsStat = new Stat();
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andThrow(e);
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA);
+ replay(zk);
+
+ assertArrayEquals(DATA, zc.get(ZPATH));
+ verify(zk);
+ }
+
+ // ---
+
+ @Test
+ public void testGetChildren() throws Exception {
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andReturn(CHILDREN);
+ replay(zk);
+
+ assertFalse(zc.childrenCached(ZPATH));
+ assertEquals(CHILDREN, zc.getChildren(ZPATH));
+ verify(zk);
+
+ assertTrue(zc.childrenCached(ZPATH));
+ // cannot check for sameness, return value is wrapped each time
+ assertEquals(CHILDREN, zc.getChildren(ZPATH)); // cache hit
+ }
+
+ @Test
+ public void testGetChildren_NoKids() throws Exception {
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andReturn(null);
+ replay(zk);
+
+ assertNull(zc.getChildren(ZPATH));
+ verify(zk);
+
+ assertNull(zc.getChildren(ZPATH)); // cache hit
+ }
+
+ @Test
+ public void testGetChildren_Retry() throws Exception {
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andThrow(new KeeperException.BadVersionException(ZPATH));
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andReturn(CHILDREN);
+ replay(zk);
+
+ assertEquals(CHILDREN, zc.getChildren(ZPATH));
+ verify(zk);
+ }
+
+ @Test
+ public void testGetChildren_EatNoNode() throws Exception {
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andThrow(new KeeperException.NoNodeException(ZPATH));
+ replay(zk);
+
+ assertNull(zc.getChildren(ZPATH));
+ verify(zk);
+ }
+
+ private static class TestWatcher implements Watcher {
+ private final WatchedEvent expectedEvent;
+ private boolean wasCalled;
+
+ TestWatcher(WatchedEvent event) {
+ expectedEvent = event;
+ wasCalled = false;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ assertSame(expectedEvent, event);
+ wasCalled = true;
+ }
+
+ boolean wasCalled() {
+ return wasCalled;
+ }
+ }
+
+ @Test
+ public void testWatchDataNode_Deleted() throws Exception {
+ testWatchDataNode(DATA, Watcher.Event.EventType.NodeDeleted, false);
+ }
+
+ @Test
+ public void testWatchDataNode_DataChanged() throws Exception {
+ testWatchDataNode(DATA, Watcher.Event.EventType.NodeDataChanged, false);
+ }
+
+ @Test
+ public void testWatchDataNode_Created() throws Exception {
+ testWatchDataNode(null, Watcher.Event.EventType.NodeCreated, false);
+ }
+
+ @Test
+ public void testWatchDataNode_NoneSyncConnected() throws Exception {
+ testWatchDataNode(null, Watcher.Event.EventType.None, true);
+ }
+
+ private void testWatchDataNode(byte[] initialData, Watcher.Event.EventType eventType, boolean stillCached) throws Exception {
+ WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH);
+ TestWatcher exw = new TestWatcher(event);
+ zc = new ZooCache(zr, exw);
+
+ Watcher w = watchData(initialData);
+ w.process(event);
+ assertTrue(exw.wasCalled());
+ assertEquals(stillCached, zc.dataCached(ZPATH));
+ }
+
+ private Watcher watchData(byte[] initialData) throws Exception {
+ Capture<Watcher> cw = new Capture<Watcher>();
+ Stat existsStat = new Stat();
+ if (initialData != null) {
+ expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(initialData);
+ } else {
+ expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(null);
+ }
+ replay(zk);
+ zc.get(ZPATH);
+ assertTrue(zc.dataCached(ZPATH));
+
+ return cw.getValue();
+ }
+
+ @Test
+ public void testWatchDataNode_Disconnected() throws Exception {
+ testWatchDataNode_Clear(Watcher.Event.KeeperState.Disconnected);
+ }
+
+ @Test
+ public void testWatchDataNode_Expired() throws Exception {
+ testWatchDataNode_Clear(Watcher.Event.KeeperState.Expired);
+ }
+
+ private void testWatchDataNode_Clear(Watcher.Event.KeeperState state) throws Exception {
+ WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, state, null);
+ TestWatcher exw = new TestWatcher(event);
+ zc = new ZooCache(zr, exw);
+
+ Watcher w = watchData(DATA);
+ assertTrue(zc.dataCached(ZPATH));
+ w.process(event);
+ assertTrue(exw.wasCalled());
+ assertFalse(zc.dataCached(ZPATH));
+ }
+
+ @Test
+ public void testWatchChildrenNode_Deleted() throws Exception {
+ testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.NodeDeleted, false);
+ }
+
+ @Test
+ public void testWatchChildrenNode_ChildrenChanged() throws Exception {
+ testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.NodeChildrenChanged, false);
+ }
+
+ @Test
+ public void testWatchChildrenNode_Created() throws Exception {
+ testWatchChildrenNode(null, Watcher.Event.EventType.NodeCreated, false);
+ }
+
+ @Test
+ public void testWatchChildrenNode_NoneSyncConnected() throws Exception {
+ testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.None, true);
+ }
+
+ private void testWatchChildrenNode(List<String> initialChildren, Watcher.Event.EventType eventType, boolean stillCached) throws Exception {
+ WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH);
+ TestWatcher exw = new TestWatcher(event);
+ zc = new ZooCache(zr, exw);
+
+ Watcher w = watchChildren(initialChildren);
+ w.process(event);
+ assertTrue(exw.wasCalled());
+ assertEquals(stillCached, zc.childrenCached(ZPATH));
+ }
+
+ private Watcher watchChildren(List<String> initialChildren) throws Exception {
+ Capture<Watcher> cw = new Capture<Watcher>();
+ expect(zk.getChildren(eq(ZPATH), capture(cw))).andReturn(initialChildren);
+ replay(zk);
+ zc.getChildren(ZPATH);
+ assertTrue(zc.childrenCached(ZPATH));
+
+ return cw.getValue();
+ }
+}
[3/3] git commit: Merge branch '1.6.1-SNAPSHOT'
Posted by bh...@apache.org.
Merge branch '1.6.1-SNAPSHOT'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7749cbc7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7749cbc7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7749cbc7
Branch: refs/heads/master
Commit: 7749cbc7b3177d46dacc76833ba460269e29ad19
Parents: 809b73c 4950870
Author: Bill Havanki <bh...@cloudera.com>
Authored: Fri Jun 6 16:11:50 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Fri Jun 6 16:11:50 2014 -0400
----------------------------------------------------------------------
.../accumulo/fate/zookeeper/ZooCache.java | 84 ++++-
.../accumulo/fate/zookeeper/ZooCacheTest.java | 345 +++++++++++++++++++
2 files changed, 423 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
[2/3] git commit: ACCUMULO-2865 Add ZooCacheTest
Posted by bh...@apache.org.
ACCUMULO-2865 Add ZooCacheTest
This adds a unit test for ZooCache. ZooCache itself was only changed slightly, with the
addition of methods to check if information was cached and improvements to its comments.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4950870d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4950870d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4950870d
Branch: refs/heads/master
Commit: 4950870de6dcf26dc1aa78f63e39da513a0d92de
Parents: 18d6ca1
Author: Bill Havanki <bh...@cloudera.com>
Authored: Fri Jun 6 12:33:21 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Fri Jun 6 14:13:50 2014 -0400
----------------------------------------------------------------------
.../accumulo/fate/zookeeper/ZooCache.java | 85 ++++-
.../accumulo/fate/zookeeper/ZooCacheTest.java | 345 +++++++++++++++++++
2 files changed, 423 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4950870d/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index 99ffd04..d9eb243 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.fate.zookeeper;
+import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -27,7 +28,6 @@ import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
@@ -38,8 +38,8 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
- * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
- *
+ * A cache for values stored in ZooKeeper. Values are kept up to date as they
+ * change.
*/
public class ZooCache {
private static final Logger log = Logger.getLogger(ZooCache.class);
@@ -100,14 +100,35 @@ public class ZooCache {
}
}
+ /**
+ * Creates a new cache.
+ *
+ * @param zooKeepers comma-separated list of ZooKeeper host[:port]s
+ * @param sessionTimeout ZooKeeper session timeout
+ */
public ZooCache(String zooKeepers, int sessionTimeout) {
this(zooKeepers, sessionTimeout, null);
}
+ /**
+ * Creates a new cache. The given watcher is called whenever a watched node
+ * changes.
+ *
+ * @param zooKeepers comma-separated list of ZooKeeper host[:port]s
+ * @param sessionTimeout ZooKeeper session timeout
+ * @param watcher watcher object
+ */
public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
this(new ZooReader(zooKeepers, sessionTimeout), watcher);
}
+ /**
+ * Creates a new cache. The given watcher is called whenever a watched node
+ * changes.
+ *
+ * @param reader ZooKeeper reader
+ * @param watcher watcher object
+ */
public ZooCache(ZooReader reader, Watcher watcher) {
this.zReader = reader;
this.cache = new HashMap<String,byte[]>();
@@ -134,7 +155,7 @@ public class ZooCache {
} catch (KeeperException e) {
if (e.code() == Code.NONODE) {
- log.error("Looked up non existant node in cache " + e.getPath(), e);
+ log.error("Looked up non-existent node in cache " + e.getPath(), e);
}
log.warn("Zookeeper error, will retry", e);
} catch (InterruptedException e) {
@@ -155,6 +176,12 @@ public class ZooCache {
}
}
+ /**
+ * Gets the children of the given node. A watch is established by this call.
+ *
+ * @param zPath path of node
+ * @return children list, or null if node has no children or does not exist
+ */
public synchronized List<String> getChildren(final String zPath) {
ZooRunnable zr = new ZooRunnable() {
@@ -186,10 +213,25 @@ public class ZooCache {
return Collections.unmodifiableList(children);
}
+ /**
+ * Gets data at the given path. Status information is not returned. A watch is
+ * established by this call.
+ *
+ * @param zPath path to get
+ * @return path data, or null if non-existent
+ */
public synchronized byte[] get(final String zPath) {
return get(zPath, null);
}
+ /**
+ * Gets data at the given path, filling status information into the given
+ * <code>Stat</code> object. A watch is established by this call.
+ *
+ * @param zPath path to get
+ * @param stat status object to populate
+ * @return path data, or null if non-existent
+ */
public synchronized byte[] get(final String zPath, Stat stat) {
ZooRunnable zr = new ZooRunnable() {
@@ -200,11 +242,11 @@ public class ZooCache {
return;
/*
- * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existance, it will be added to
- * the cache. But this notification of a node coming into existance will only be given if exists() was previously called.
+ * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existence, it will be added to
+ * the cache. But this notification of a node coming into existence will only be given if exists() was previously called.
*
* If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then
- * non-existance can not be cached.
+ * non-existence can not be cached.
*/
Stat stat = zooKeeper.exists(zPath, watcher);
@@ -270,12 +312,41 @@ public class ZooCache {
statCache.remove(zPath);
}
+ /**
+ * Clears this cache.
+ */
public synchronized void clear() {
cache.clear();
childrenCache.clear();
statCache.clear();
}
+ /**
+ * Checks if a data value (or lack of one) is cached.
+ *
+ * @param zPath path of node
+ * @return true if data value is cached
+ */
+ @VisibleForTesting
+ synchronized boolean dataCached(String zPath) {
+ return cache.containsKey(zPath);
+ }
+ /**
+ * Checks if children of a node (or lack of them) are cached.
+ *
+ * @param zPath path of node
+ * @return true if children are cached
+ */
+ @VisibleForTesting
+ synchronized boolean childrenCached(String zPath) {
+ return childrenCache.containsKey(zPath);
+ }
+
+ /**
+ * Clears this cache of all information about nodes rooted at the given path.
+ *
+ * @param zPath path of top node
+ */
public synchronized void clear(String zPath) {
for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4950870d/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
new file mode 100644
index 0000000..e3db785
--- /dev/null
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/ZooCacheTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.List;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import org.easymock.Capture;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createStrictMock;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class ZooCacheTest {
+ private static final String ZPATH = "/some/path/in/zk";
+ private static final byte[] DATA = {(byte) 1, (byte) 2, (byte) 3, (byte) 4};
+ private static final List<String> CHILDREN = java.util.Arrays.asList(new String[] {"huey", "dewey", "louie"});
+
+ private ZooReader zr;
+ private ZooKeeper zk;
+ private ZooCache zc;
+
+ @Before
+ public void setUp() throws Exception {
+ zr = createMock(ZooReader.class);
+ zk = createStrictMock(ZooKeeper.class);
+ expect(zr.getZooKeeper()).andReturn(zk);
+ expectLastCall().anyTimes();
+ replay(zr);
+
+ zc = new ZooCache(zr, null);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ testGet(false);
+ }
+
+ @Test
+ public void testGet_FillStat() throws Exception {
+ testGet(true);
+ }
+
+ private void testGet(boolean fillStat) throws Exception {
+ Stat myStat = null;
+ if (fillStat) {
+ myStat = new Stat();
+ }
+ long now = System.currentTimeMillis();
+ Stat existsStat = new Stat();
+ existsStat.setMtime(now);
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA);
+ replay(zk);
+
+ assertFalse(zc.dataCached(ZPATH));
+ assertArrayEquals(DATA, (fillStat ? zc.get(ZPATH, myStat) : zc.get(ZPATH)));
+ verify(zk);
+ if (fillStat) {
+ assertEquals(now, myStat.getMtime());
+ }
+
+ assertTrue(zc.dataCached(ZPATH));
+ assertSame(DATA, zc.get(ZPATH)); // cache hit
+ }
+
+ @Test
+ public void testGet_NonExistent() throws Exception {
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(null);
+ replay(zk);
+
+ assertNull(zc.get(ZPATH));
+ verify(zk);
+ }
+
+ @Test
+ public void testGet_Retry_NoNode() throws Exception {
+ testGet_Retry(new KeeperException.NoNodeException(ZPATH));
+ }
+
+ @Test
+ public void testGet_Retry_ConnectionLoss() throws Exception {
+ testGet_Retry(new KeeperException.ConnectionLossException());
+ }
+
+ @Test
+ public void testGet_Retry_BadVersion() throws Exception {
+ testGet_Retry(new KeeperException.BadVersionException(ZPATH));
+ }
+
+ @Test
+ public void testGet_Retry_Interrupted() throws Exception {
+ testGet_Retry(new InterruptedException());
+ }
+
+ private void testGet_Retry(Exception e) throws Exception {
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andThrow(e);
+ Stat existsStat = new Stat();
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA);
+ replay(zk);
+
+ assertArrayEquals(DATA, zc.get(ZPATH));
+ verify(zk);
+ }
+
+ @Test
+ public void testGet_Retry2_NoNode() throws Exception {
+ testGet_Retry2(new KeeperException.NoNodeException(ZPATH));
+ }
+
+ @Test
+ public void testGet_Retry2_ConnectionLoss() throws Exception {
+ testGet_Retry2(new KeeperException.ConnectionLossException());
+ }
+
+ @Test
+ public void testGet_Retry2_BadVersion() throws Exception {
+ testGet_Retry2(new KeeperException.BadVersionException(ZPATH));
+ }
+
+ @Test
+ public void testGet_Retry2_Interrupted() throws Exception {
+ testGet_Retry2(new InterruptedException());
+ }
+
+ private void testGet_Retry2(Exception e) throws Exception {
+ Stat existsStat = new Stat();
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andThrow(e);
+ expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA);
+ replay(zk);
+
+ assertArrayEquals(DATA, zc.get(ZPATH));
+ verify(zk);
+ }
+
+ // ---
+
+ @Test
+ public void testGetChildren() throws Exception {
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andReturn(CHILDREN);
+ replay(zk);
+
+ assertFalse(zc.childrenCached(ZPATH));
+ assertEquals(CHILDREN, zc.getChildren(ZPATH));
+ verify(zk);
+
+ assertTrue(zc.childrenCached(ZPATH));
+ // cannot check for sameness, return value is wrapped each time
+ assertEquals(CHILDREN, zc.getChildren(ZPATH)); // cache hit
+ }
+
+ @Test
+ public void testGetChildren_NoKids() throws Exception {
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andReturn(null);
+ replay(zk);
+
+ assertNull(zc.getChildren(ZPATH));
+ verify(zk);
+
+ assertNull(zc.getChildren(ZPATH)); // cache hit
+ }
+
+ @Test
+ public void testGetChildren_Retry() throws Exception {
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andThrow(new KeeperException.BadVersionException(ZPATH));
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andReturn(CHILDREN);
+ replay(zk);
+
+ assertEquals(CHILDREN, zc.getChildren(ZPATH));
+ verify(zk);
+ }
+
+ @Test
+ public void testGetChildren_EatNoNode() throws Exception {
+ expect(zk.getChildren(eq(ZPATH), anyObject(Watcher.class))).andThrow(new KeeperException.NoNodeException(ZPATH));
+ replay(zk);
+
+ assertNull(zc.getChildren(ZPATH));
+ verify(zk);
+ }
+
+ private static class TestWatcher implements Watcher {
+ private final WatchedEvent expectedEvent;
+ private boolean wasCalled;
+
+ TestWatcher(WatchedEvent event) {
+ expectedEvent = event;
+ wasCalled = false;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ assertSame(expectedEvent, event);
+ wasCalled = true;
+ }
+
+ boolean wasCalled() {
+ return wasCalled;
+ }
+ }
+
+ @Test
+ public void testWatchDataNode_Deleted() throws Exception {
+ testWatchDataNode(DATA, Watcher.Event.EventType.NodeDeleted, false);
+ }
+
+ @Test
+ public void testWatchDataNode_DataChanged() throws Exception {
+ testWatchDataNode(DATA, Watcher.Event.EventType.NodeDataChanged, false);
+ }
+
+ @Test
+ public void testWatchDataNode_Created() throws Exception {
+ testWatchDataNode(null, Watcher.Event.EventType.NodeCreated, false);
+ }
+
+ @Test
+ public void testWatchDataNode_NoneSyncConnected() throws Exception {
+ testWatchDataNode(null, Watcher.Event.EventType.None, true);
+ }
+
+ private void testWatchDataNode(byte[] initialData, Watcher.Event.EventType eventType, boolean stillCached) throws Exception {
+ WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH);
+ TestWatcher exw = new TestWatcher(event);
+ zc = new ZooCache(zr, exw);
+
+ Watcher w = watchData(initialData);
+ w.process(event);
+ assertTrue(exw.wasCalled());
+ assertEquals(stillCached, zc.dataCached(ZPATH));
+ }
+
+ private Watcher watchData(byte[] initialData) throws Exception {
+ Capture<Watcher> cw = new Capture<Watcher>();
+ Stat existsStat = new Stat();
+ if (initialData != null) {
+ expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(existsStat);
+ expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(initialData);
+ } else {
+ expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(null);
+ }
+ replay(zk);
+ zc.get(ZPATH);
+ assertTrue(zc.dataCached(ZPATH));
+
+ return cw.getValue();
+ }
+
+ @Test
+ public void testWatchDataNode_Disconnected() throws Exception {
+ testWatchDataNode_Clear(Watcher.Event.KeeperState.Disconnected);
+ }
+
+ @Test
+ public void testWatchDataNode_Expired() throws Exception {
+ testWatchDataNode_Clear(Watcher.Event.KeeperState.Expired);
+ }
+
+ private void testWatchDataNode_Clear(Watcher.Event.KeeperState state) throws Exception {
+ WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, state, null);
+ TestWatcher exw = new TestWatcher(event);
+ zc = new ZooCache(zr, exw);
+
+ Watcher w = watchData(DATA);
+ assertTrue(zc.dataCached(ZPATH));
+ w.process(event);
+ assertTrue(exw.wasCalled());
+ assertFalse(zc.dataCached(ZPATH));
+ }
+
+ @Test
+ public void testWatchChildrenNode_Deleted() throws Exception {
+ testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.NodeDeleted, false);
+ }
+
+ @Test
+ public void testWatchChildrenNode_ChildrenChanged() throws Exception {
+ testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.NodeChildrenChanged, false);
+ }
+
+ @Test
+ public void testWatchChildrenNode_Created() throws Exception {
+ testWatchChildrenNode(null, Watcher.Event.EventType.NodeCreated, false);
+ }
+
+ @Test
+ public void testWatchChildrenNode_NoneSyncConnected() throws Exception {
+ testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.None, true);
+ }
+
+ private void testWatchChildrenNode(List<String> initialChildren, Watcher.Event.EventType eventType, boolean stillCached) throws Exception {
+ WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH);
+ TestWatcher exw = new TestWatcher(event);
+ zc = new ZooCache(zr, exw);
+
+ Watcher w = watchChildren(initialChildren);
+ w.process(event);
+ assertTrue(exw.wasCalled());
+ assertEquals(stillCached, zc.childrenCached(ZPATH));
+ }
+
+ private Watcher watchChildren(List<String> initialChildren) throws Exception {
+ Capture<Watcher> cw = new Capture<Watcher>();
+ expect(zk.getChildren(eq(ZPATH), capture(cw))).andReturn(initialChildren);
+ replay(zk);
+ zc.getChildren(ZPATH);
+ assertTrue(zc.childrenCached(ZPATH));
+
+ return cw.getValue();
+ }
+}