You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/12/31 21:41:19 UTC
[1/5] curator git commit: refactoring
Repository: curator
Updated Branches:
refs/heads/persistent-watch 076583d14 -> 38c766310
refactoring
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5b0a9f56
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5b0a9f56
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5b0a9f56
Branch: refs/heads/persistent-watch
Commit: 5b0a9f56e7d050eedfea0618f90c58d718441d3f
Parents: 076583d
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 14:09:53 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 14:09:53 2016 -0500
----------------------------------------------------------------------
.../framework/recipes/watch/CacheAction.java | 5 +-
.../framework/recipes/watch/CacheFilter.java | 2 +-
.../framework/recipes/watch/CacheFilters.java | 45 +++++++++++++++
.../recipes/watch/CuratorCacheBuilder.java | 23 ++++----
.../recipes/watch/InternalCuratorCache.java | 47 +++++++++++++---
.../recipes/watch/InternalNodeCache.java | 8 +--
.../recipes/watch/NoDataCacheFilter.java | 4 +-
.../framework/recipes/watch/RefreshFilter.java | 2 +-
.../framework/recipes/watch/RefreshFilters.java | 36 ++++++++++++
.../recipes/watch/SingleLevelCacheFilter.java | 59 --------------------
.../recipes/watch/SingleLevelRefreshFilter.java | 37 ------------
.../recipes/watch/StandardCacheFilter.java | 46 +++++++++++++++
.../recipes/watch/StatsOnlyCacheFilter.java | 28 ----------
.../recipes/watch/TreeRefreshFilter.java | 28 ----------
.../watch/TestSingleLevelCuratorCache.java | 5 +-
15 files changed, 190 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
index a59fe99..b39457a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
@@ -22,6 +22,7 @@ public enum CacheAction
{
NOT_STORED,
PATH_ONLY,
- PATH_AND_DATA,
- PATH_AND_COMPRESSED_DATA
+ STAT_ONLY,
+ STAT_AND_DATA,
+ STAT_AND_COMPRESSED_DATA
}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
index 9923174..6cccc88 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
@@ -20,5 +20,5 @@ package org.apache.curator.framework.recipes.watch;
public interface CacheFilter
{
- CacheAction actionForPath(String path);
+ CacheAction actionForPath(String mainPath, String checkPath);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
new file mode 100644
index 0000000..171dea4
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
@@ -0,0 +1,45 @@
+package org.apache.curator.framework.recipes.watch;
+
+public class CacheFilters
+{
+ private static final CacheFilter statAndData = new StandardCacheFilter(CacheAction.STAT_AND_DATA);
+ private static final CacheFilter compressedStatAndData = new StandardCacheFilter(CacheAction.STAT_AND_COMPRESSED_DATA);
+ private static final CacheFilter statOnly = new StandardCacheFilter(CacheAction.STAT_ONLY);
+ private static final CacheFilter pathOnly = new StandardCacheFilter(CacheAction.PATH_ONLY);
+
+ public static CacheFilter statAndData()
+ {
+ return statAndData;
+ }
+
+ public static CacheFilter compressedData()
+ {
+ return compressedStatAndData;
+ }
+
+ public static CacheFilter statOnly()
+ {
+ return statOnly;
+ }
+
+ public static CacheFilter pathOnly()
+ {
+ return pathOnly;
+ }
+
+ public static CacheFilter full(final CacheAction cacheAction)
+ {
+ return new CacheFilter()
+ {
+ @Override
+ public CacheAction actionForPath(String mainPath, String checkPath)
+ {
+ return cacheAction;
+ }
+ };
+ }
+
+ private CacheFilters()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index e22aa1c..341ac23 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.framework.recipes.watch;
+import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import org.apache.curator.framework.CuratorFramework;
import java.util.Objects;
@@ -28,18 +29,11 @@ public class CuratorCacheBuilder
private final CuratorFramework client;
private final String path;
private CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
- private boolean singleNode;
- private RefreshFilter refreshFilter;
+ private boolean singleNode = false;
+ private RefreshFilter refreshFilter = null;
private boolean sendRefreshEvents = true;
private boolean refreshOnStart = true;
- private CacheFilter cacheFilter = new CacheFilter()
- {
- @Override
- public CacheAction actionForPath(String path)
- {
- return CacheAction.PATH_AND_DATA;
- }
- };
+ private CacheFilter cacheFilter = CacheFilters.statAndData();
public static CuratorCacheBuilder builder(CuratorFramework client, String path)
{
@@ -51,6 +45,7 @@ public class CuratorCacheBuilder
{
if ( singleNode )
{
+ Preconditions.checkState(refreshFilter == null, "Single node caches do not use RefreshFilters");
return new InternalNodeCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart);
}
return new InternalCuratorCache(client, path, cacheFilter, refreshFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart);
@@ -60,21 +55,23 @@ public class CuratorCacheBuilder
{
singleNode = true;
refreshFilter = null;
+ cacheFilter = CacheFilters.statAndData();
return this;
}
public CuratorCacheBuilder forSingleLevel()
{
singleNode = false;
- refreshFilter = new SingleLevelRefreshFilter(path);
- cacheFilter = new SingleLevelCacheFilter(path);
+ refreshFilter = RefreshFilters.singleLevel();
+ cacheFilter = CacheFilters.statAndData();
return this;
}
public CuratorCacheBuilder forTree()
{
singleNode = false;
- refreshFilter = new TreeRefreshFilter();
+ refreshFilter = RefreshFilters.tree();
+ cacheFilter = CacheFilters.statAndData();
return this;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index aa47363..303ebb1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -46,7 +46,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
private static final RefreshFilter nopRefreshFilter = new RefreshFilter()
{
@Override
- public boolean descend(String path)
+ public boolean descend(String mainPath, String checkPath)
{
return false;
}
@@ -157,7 +157,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
{
CacheAction cacheAction = (CacheAction)event.getContext();
CachedNode newNode = new CachedNode(event.getStat(), event.getData());
- CachedNode oldNode = cache.asMap().put(path, (cacheAction == CacheAction.PATH_ONLY) ? new CachedNode(event.getStat()) : newNode);
+ CachedNode oldNode = putNewNode(path, cacheAction, newNode);
if ( oldNode == null )
{
notifyListeners(CacheEvent.NODE_CREATED, path, newNode);
@@ -187,7 +187,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
};
- CacheAction cacheAction = cacheFilter.actionForPath(path);
+ CacheAction cacheAction = cacheFilter.actionForPath(basePath, path);
switch ( cacheAction )
{
case NOT_STORED:
@@ -196,8 +196,8 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
break;
}
- case PATH_ONLY:
- case PATH_AND_DATA:
+ case STAT_ONLY:
+ case STAT_AND_DATA:
{
try
{
@@ -212,7 +212,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
break;
}
- case PATH_AND_COMPRESSED_DATA:
+ case STAT_AND_COMPRESSED_DATA:
{
try
{
@@ -228,7 +228,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
}
- if ( refreshFilter.descend(path) )
+ if ( refreshFilter.descend(basePath, path) )
{
refresher.increment();
try
@@ -243,6 +243,39 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
}
+ private CachedNode putNewNode(String path, CacheAction cacheAction, CachedNode newNode)
+ {
+ CachedNode putNode;
+ switch ( cacheAction )
+ {
+ default:
+ case NOT_STORED:
+ {
+ throw new IllegalStateException(String.format("Should not be here with action %s for path %s", cacheAction, path));
+ }
+
+ case PATH_ONLY:
+ {
+ putNode = nullNode;
+ break;
+ }
+
+ case STAT_ONLY:
+ {
+ putNode = new CachedNode(newNode.getStat());
+ break;
+ }
+
+ case STAT_AND_DATA:
+ case STAT_AND_COMPRESSED_DATA:
+ {
+ putNode = newNode;
+ break;
+ }
+ }
+ return cache.asMap().put(path, putNode);
+ }
+
private void decrementOutstanding(SettableFuture<Boolean> task, AtomicInteger outstandingCount)
{
if ( outstandingCount.decrementAndGet() <= 0 )
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
index b78480a..b4a7b16 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
@@ -179,7 +179,7 @@ class InternalNodeCache extends CuratorCacheBase
}
else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
- switch ( cacheFilter.actionForPath(path) )
+ switch ( cacheFilter.actionForPath(path, path) )
{
default:
case NOT_STORED:
@@ -187,20 +187,20 @@ class InternalNodeCache extends CuratorCacheBase
throw new UnsupportedOperationException("Single node cache does not support action: IGNORE");
}
- case PATH_ONLY:
+ case STAT_ONLY:
{
setNewData(nullNode);
break;
}
- case PATH_AND_DATA:
+ case STAT_AND_DATA:
{
refresher.increment();
client.getData().usingWatcher(watcher).inBackground(backgroundCallback, refresher).forPath(path);
break;
}
- case PATH_AND_COMPRESSED_DATA:
+ case STAT_AND_COMPRESSED_DATA:
{
refresher.increment();
client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback, refresher).forPath(path);
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java
index b68402d..97e86da 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java
@@ -21,8 +21,8 @@ package org.apache.curator.framework.recipes.watch;
public class NoDataCacheFilter implements CacheFilter
{
@Override
- public CacheAction actionForPath(String path)
+ public CacheAction actionForPath(String mainPath, String checkPath)
{
- return CacheAction.PATH_ONLY;
+ return CacheAction.STAT_ONLY;
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java
index 5b7fa7f..8a29826 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java
@@ -20,5 +20,5 @@ package org.apache.curator.framework.recipes.watch;
public interface RefreshFilter
{
- boolean descend(String path);
+ boolean descend(String mainPath, String checkPath);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
new file mode 100644
index 0000000..c1581c4
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
@@ -0,0 +1,36 @@
+package org.apache.curator.framework.recipes.watch;
+
+public class RefreshFilters
+{
+ private static final RefreshFilter singleLevel = new RefreshFilter()
+ {
+ @Override
+ public boolean descend(String mainPath, String checkPath)
+ {
+ return mainPath.equals(checkPath);
+ }
+ };
+
+ private static final RefreshFilter tree = new RefreshFilter()
+ {
+ @Override
+ public boolean descend(String mainPath, String checkPath)
+ {
+ return true;
+ }
+ };
+
+ public static RefreshFilter singleLevel()
+ {
+ return singleLevel;
+ }
+
+ public static RefreshFilter tree()
+ {
+ return tree;
+ }
+
+ private RefreshFilters()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
deleted file mode 100644
index 615a292..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.watch;
-
-import org.apache.curator.utils.ZKPaths;
-
-public class SingleLevelCacheFilter implements CacheFilter
-{
- private final String levelPath;
- private final CacheAction defaultAction;
- private final boolean isRoot;
-
- public SingleLevelCacheFilter(String levelPath)
- {
- this(levelPath, CacheAction.PATH_AND_DATA);
- }
-
- public SingleLevelCacheFilter(String levelPath, CacheAction defaultAction)
- {
- this.levelPath = levelPath;
- this.defaultAction = defaultAction;
- isRoot = levelPath.equals(ZKPaths.PATH_SEPARATOR);
- }
-
- @Override
- public CacheAction actionForPath(String path)
- {
- if ( isRoot && path.equals(ZKPaths.PATH_SEPARATOR) ) // special case. The parent of "/" is "/"
- {
- return CacheAction.NOT_STORED;
- }
- else if ( ZKPaths.getPathAndNode(path).getPath().equals(levelPath) )
- {
- return actionForMatchedPath();
- }
- return CacheAction.NOT_STORED;
- }
-
- protected CacheAction actionForMatchedPath()
- {
- return defaultAction;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelRefreshFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelRefreshFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelRefreshFilter.java
deleted file mode 100644
index f023a3e..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelRefreshFilter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.watch;
-
-import java.util.Objects;
-
-public class SingleLevelRefreshFilter implements RefreshFilter
-{
- private final String basePath;
-
- public SingleLevelRefreshFilter(String basePath)
- {
- this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
- }
-
- @Override
- public boolean descend(String path)
- {
- return basePath.equals(path);
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StandardCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StandardCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StandardCacheFilter.java
new file mode 100644
index 0000000..e27355d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StandardCacheFilter.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.utils.ZKPaths;
+
+class StandardCacheFilter implements CacheFilter
+{
+ private final CacheAction cacheAction;
+
+ StandardCacheFilter(CacheAction cacheAction)
+ {
+ this.cacheAction = cacheAction;
+ }
+
+ @Override
+ public CacheAction actionForPath(String mainPath, String checkPath)
+ {
+ boolean mainPathIsRoot = mainPath.endsWith(ZKPaths.PATH_SEPARATOR);
+ if ( mainPathIsRoot && checkPath.equals(ZKPaths.PATH_SEPARATOR) ) // special case. The parent of "/" is "/"
+ {
+ return CacheAction.NOT_STORED;
+ }
+ else if ( ZKPaths.getPathAndNode(checkPath).getPath().equals(mainPath) )
+ {
+ return cacheAction;
+ }
+ return CacheAction.NOT_STORED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
deleted file mode 100644
index bba145a..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.watch;
-
-public class StatsOnlyCacheFilter implements CacheFilter
-{
- @Override
- public CacheAction actionForPath(String path)
- {
- return CacheAction.PATH_ONLY;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/TreeRefreshFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/TreeRefreshFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/TreeRefreshFilter.java
deleted file mode 100644
index dda49b4..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/TreeRefreshFilter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.watch;
-
-public class TreeRefreshFilter implements RefreshFilter
-{
- @Override
- public boolean descend(String path)
- {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
index 4ccf3b7..c3cc327 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
@@ -284,8 +284,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
final CountDownLatch updatedLatch = new CountDownLatch(1);
final CountDownLatch addedLatch = new CountDownLatch(1);
client.create().creatingParentsIfNeeded().forPath("/test");
- SingleLevelCacheFilter cacheFilter = new SingleLevelCacheFilter("/test", CacheAction.PATH_ONLY);
- cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().withCacheFilter(cacheFilter).build();
+ cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().withCacheFilter(CacheFilters.statOnly()).build();
cache.getListenable().addListener(new CacheListener()
{
@Override
@@ -724,7 +723,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
{
- CacheFilter cacheFilter = new SingleLevelCacheFilter("/test", cacheData ? CacheAction.PATH_AND_DATA : CacheAction.PATH_ONLY);
+ CacheFilter cacheFilter = cacheData ? CacheFilters.statAndData() : CacheFilters.statOnly();
try (CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().withCacheFilter(cacheFilter).build() )
{
final CountDownLatch latch = new CountDownLatch(2);
[2/5] curator git commit: Added a composite cache
Posted by ra...@apache.org.
Added a composite cache
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/313fd7d4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/313fd7d4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/313fd7d4
Branch: refs/heads/persistent-watch
Commit: 313fd7d46ccede6bbc9ac1feb0b5a2099fce7a6d
Parents: 5b0a9f5
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 15:12:04 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 15:12:04 2016 -0500
----------------------------------------------------------------------
.../framework/recipes/watch/CacheFilters.java | 18 ++
.../recipes/watch/CompositeCountDownLatch.java | 82 ++++++
.../recipes/watch/CompositeCuratorCache.java | 282 +++++++++++++++++++
.../watch/CompositeCuratorCacheProxy.java | 53 ++++
.../framework/recipes/watch/CuratorCache.java | 11 +-
.../recipes/watch/CuratorCacheBase.java | 5 +-
.../recipes/watch/CuratorCacheBuilder.java | 8 +
.../recipes/watch/InternalCuratorCache.java | 4 +-
.../framework/recipes/watch/RefreshFilters.java | 18 ++
9 files changed, 468 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
index 171dea4..6d7a119 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes.watch;
public class CacheFilters
http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCountDownLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCountDownLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCountDownLatch.java
new file mode 100644
index 0000000..5c75622
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCountDownLatch.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+class CompositeCountDownLatch extends CountDownLatch
+{
+ private final Collection<CountDownLatch> latches;
+
+ CompositeCountDownLatch(Collection<CountDownLatch> latches)
+ {
+ super(0);
+ this.latches = ImmutableList.copyOf(latches);
+ }
+
+ @Override
+ public void await() throws InterruptedException
+ {
+ for ( CountDownLatch latch : latches )
+ {
+ latch.await();
+ }
+ }
+
+ @Override
+ public boolean await(final long timeout, TimeUnit unit) throws InterruptedException
+ {
+ long timeoutMs = unit.toMillis(timeout);
+ for ( CountDownLatch latch : latches )
+ {
+ if ( timeoutMs < 0 )
+ {
+ return false;
+ }
+ long start = System.currentTimeMillis();
+ if ( !latch.await(timeoutMs, TimeUnit.MILLISECONDS) )
+ {
+ return false;
+ }
+ long elapsed = System.currentTimeMillis() - start;
+ timeoutMs -= elapsed;
+ }
+ return true;
+ }
+
+ @Override
+ public void countDown()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getCount()
+ {
+ long count = 0;
+ for ( CountDownLatch latch : latches )
+ {
+ count += latch.getCount();
+ }
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
new file mode 100644
index 0000000..c0163c5
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+public class CompositeCuratorCache implements CuratorCache
+{
+ private final Map<String, CuratorCache> caches;
+ private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
+ private final CacheListener listener = new CacheListener()
+ {
+ @Override
+ public void process(final CacheEvent event, final String path, final CachedNode affectedNode)
+ {
+ Function<CacheListener, Void> proc = new Function<CacheListener, Void>()
+ {
+ @Override
+ public Void apply(CacheListener listener)
+ {
+ listener.process(event, path, affectedNode);
+ return null;
+ }
+ };
+ listeners.forEach(proc);
+ }
+ };
+
+ public CompositeCuratorCache(CuratorCache... caches)
+ {
+ this(toMap(Arrays.asList(caches)));
+ }
+
+ public CompositeCuratorCache(Collection<CuratorCache> caches)
+ {
+ this(toMap(caches));
+ }
+
+ private static Map<String, CuratorCache> toMap(Collection<CuratorCache> caches)
+ {
+ Map<String, CuratorCache> map = new HashMap<>();
+ for ( CuratorCache cache : caches )
+ {
+ map.put(Integer.toString(map.size()), cache);
+ }
+ return map;
+ }
+
+ public CompositeCuratorCache(Map<String, CuratorCache> caches)
+ {
+ this.caches = ImmutableMap.copyOf(caches);
+ }
+
+ public Iterable<String> cacheKeys()
+ {
+ return caches.keySet();
+ }
+
+ public CuratorCache getCache(String key)
+ {
+ return caches.get(key);
+ }
+
+ @Override
+ public CountDownLatch start()
+ {
+ List<CountDownLatch> latches = new ArrayList<>();
+ for ( CuratorCache cache : caches.values() )
+ {
+ latches.add(cache.start());
+ }
+ return new CompositeCountDownLatch(latches);
+ }
+
+ @Override
+ public void close()
+ {
+ for ( CuratorCache cache : caches.values() )
+ {
+ cache.getListenable().removeListener(listener);
+ cache.close();
+ }
+ }
+
+ @Override
+ public Listenable<CacheListener> getListenable()
+ {
+ return listeners;
+ }
+
+ @Override
+ public CountDownLatch refreshAll()
+ {
+ List<CountDownLatch> latches = new ArrayList<>();
+ for ( CuratorCache cache : caches.values() )
+ {
+ latches.add(cache.refreshAll());
+ }
+ return new CompositeCountDownLatch(latches);
+ }
+
+ @Override
+ public CountDownLatch refresh(String path)
+ {
+ List<CountDownLatch> latches = new ArrayList<>();
+ for ( CuratorCache cache : caches.values() )
+ {
+ latches.add(cache.refresh(path));
+ }
+ return new CompositeCountDownLatch(latches);
+ }
+
+ @Override
+ public boolean clear(String path)
+ {
+ boolean cleared = false;
+ for ( CuratorCache cache : caches.values() )
+ {
+ if ( cache.clear(path) )
+ {
+ cleared = true;
+ }
+ }
+ return cleared;
+ }
+
+ @Override
+ public void clearAll()
+ {
+ for ( CuratorCache cache : caches.values() )
+ {
+ cache.clearAll();
+ }
+ }
+
+ @Override
+ public boolean exists(String path)
+ {
+ for ( CuratorCache cache : caches.values() )
+ {
+ if ( cache.exists(path) )
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Set<String> paths()
+ {
+ Set<String> paths = new HashSet<>();
+ for ( CuratorCache cache : caches.values() )
+ {
+ cache.clearAll();
+ }
+ return paths;
+ }
+
+ @Override
+ public CachedNode get(String path)
+ {
+ for ( CuratorCache cache : caches.values() )
+ {
+ CachedNode node = cache.get(path);
+ if ( node != null )
+ {
+ return node;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Iterable<CachedNode> getAll()
+ {
+ List<Iterable<CachedNode>> nodes = new ArrayList<>();
+ for ( CuratorCache cache : caches.values() )
+ {
+ nodes.add(cache.getAll());
+ }
+ return Iterables.concat(nodes);
+ }
+
+ @Override
+ public Iterable<Map.Entry<String, CachedNode>> entries()
+ {
+ List<Iterable<Map.Entry<String, CachedNode>>> nodes = new ArrayList<>();
+ for ( CuratorCache cache : caches.values() )
+ {
+ nodes.add(cache.entries());
+ }
+ return Iterables.concat(nodes);
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ for ( CuratorCache cache : caches.values() )
+ {
+ if ( !cache.isEmpty() )
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int size()
+ {
+ long size = 0;
+ for ( CuratorCache cache : caches.values() )
+ {
+ size += cache.size();
+ }
+ return (int)Math.min(size, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void clearDataBytes(String path)
+ {
+ for ( CuratorCache cache : caches.values() )
+ {
+ cache.clearDataBytes(path);
+ }
+ }
+
+ @Override
+ public boolean clearDataBytes(String path, int ifVersion)
+ {
+ boolean cleared = false;
+ for ( CuratorCache cache : caches.values() )
+ {
+ if ( cache.clearDataBytes(path, ifVersion) )
+ {
+ cleared = true;
+ }
+ }
+ return cleared;
+ }
+
+ @Override
+ public long refreshCount()
+ {
+ long count = 0;
+ for ( CuratorCache cache : caches.values() )
+ {
+ count = Math.min(count, cache.refreshCount());
+ }
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCacheProxy.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCacheProxy.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCacheProxy.java
new file mode 100644
index 0000000..17ca449
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCacheProxy.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class CompositeCuratorCacheProxy extends CompositeCuratorCache
+{
+ public CompositeCuratorCacheProxy(CuratorCache... caches)
+ {
+ super(caches);
+ }
+
+ public CompositeCuratorCacheProxy(Collection<CuratorCache> caches)
+ {
+ super(caches);
+ }
+
+ public CompositeCuratorCacheProxy(Map<String, CuratorCache> caches)
+ {
+ super(caches);
+ }
+
+ @Override
+ public CountDownLatch start()
+ {
+ return new CountDownLatch(0);
+ }
+
+ @Override
+ public void close()
+ {
+ // NOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
index c8a6ea2..164ce29 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.watch;
import org.apache.curator.framework.listen.Listenable;
import java.io.Closeable;
-import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -104,20 +103,18 @@ public interface CuratorCache extends Closeable
CachedNode get(String path);
/**
- * Returns the collection of node values in the cache. The returned set behaves in the same manner
- * as {@link ConcurrentHashMap#values()}
+ * Returns the collection of node values in the cache.
*
* @return node values
*/
- Collection<CachedNode> getAll();
+ Iterable<CachedNode> getAll();
/**
- * Returns the collection of node entries in the cache. The returned set behaves in the same manner
- * as {@link ConcurrentHashMap#entrySet()}
+ * Returns the collection of node entries in the cache.
*
* @return node entries
*/
- Set<Map.Entry<String, CachedNode>> entries();
+ Iterable<Map.Entry<String, CachedNode>> entries();
/**
* Returns true if the cache is currently empty. Use the result only as a reference. Concurrent
http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
index 4803e69..3075c3b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
-import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -107,13 +106,13 @@ abstract class CuratorCacheBase implements CuratorCache
}
@Override
- public final Collection<CachedNode> getAll()
+ public final Iterable<CachedNode> getAll()
{
return cache.asMap().values();
}
@Override
- public final Set<Map.Entry<String, CachedNode>> entries()
+ public final Iterable<Map.Entry<String, CachedNode>> entries()
{
return cache.asMap().entrySet();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index 341ac23..df068c0 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -75,6 +75,14 @@ public class CuratorCacheBuilder
return this;
}
+ public CuratorCacheBuilder forFull(CacheAction cacheAction)
+ {
+ singleNode = false;
+ refreshFilter = RefreshFilters.tree();
+ cacheFilter = CacheFilters.full(cacheAction);
+ return this;
+ }
+
public CuratorCacheBuilder usingWeakValues()
{
cacheBuilder = cacheBuilder.weakValues();
http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index 303ebb1..950c336 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -124,9 +124,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
@Override
public CountDownLatch refresh(String path)
{
- Preconditions.checkArgument(path.startsWith(basePath), "Path is not this cache's tree: " + path);
-
- if ( isStarted() )
+ if ( isStarted() && path.startsWith(basePath) )
{
CountDownLatch latch = new CountDownLatch(1);
Refresher refresher = new Refresher(this, path, latch);
http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
index c1581c4..9ab1ca7 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes.watch;
public class RefreshFilters
[5/5] curator git commit: More test porting, refinements
Posted by ra...@apache.org.
More test porting, refinements
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/38c76631
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/38c76631
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/38c76631
Branch: refs/heads/persistent-watch
Commit: 38c766310432bd1d6b3f64d2778b3605df434e64
Parents: f8f5caf
Author: randgalt <ra...@apache.org>
Authored: Sat Dec 31 16:41:10 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Dec 31 16:41:10 2016 -0500
----------------------------------------------------------------------
.../framework/recipes/watch/CacheFilters.java | 19 +-
.../recipes/watch/CompositeCuratorCache.java | 15 +-
.../framework/recipes/watch/CuratorCache.java | 16 +-
.../recipes/watch/CuratorCacheBase.java | 26 +-
.../recipes/watch/CuratorCacheBuilder.java | 19 +-
.../recipes/watch/InternalCuratorCache.java | 22 +-
.../recipes/watch/NotifyingRefresher.java | 43 ++++
.../recipes/watch/PersistentWatcher.java | 20 +-
.../framework/recipes/watch/RefreshFilters.java | 25 ++
.../framework/recipes/watch/Refresher.java | 27 +-
.../recipes/watch/BaseTestTreeCache.java | 167 +++++++++++++
.../framework/recipes/watch/EventAndNode.java | 39 +++
.../recipes/watch/TestEventOrdering.java | 175 +++++++++++++
.../TestSingleLevelCuratorCacheInCluster.java | 117 +++++++++
.../framework/recipes/watch/TestTreeCache.java | 244 +++++++++++++++++++
15 files changed, 924 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
index 6d7a119..dec16b1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
@@ -24,6 +24,14 @@ public class CacheFilters
private static final CacheFilter compressedStatAndData = new StandardCacheFilter(CacheAction.STAT_AND_COMPRESSED_DATA);
private static final CacheFilter statOnly = new StandardCacheFilter(CacheAction.STAT_ONLY);
private static final CacheFilter pathOnly = new StandardCacheFilter(CacheAction.PATH_ONLY);
+ private static final CacheFilter full = new CacheFilter()
+ {
+ @Override
+ public CacheAction actionForPath(String mainPath, String checkPath)
+ {
+ return CacheAction.STAT_AND_DATA;
+ }
+ };
public static CacheFilter statAndData()
{
@@ -45,16 +53,9 @@ public class CacheFilters
return pathOnly;
}
- public static CacheFilter full(final CacheAction cacheAction)
+ public static CacheFilter full()
{
- return new CacheFilter()
- {
- @Override
- public CacheAction actionForPath(String mainPath, String checkPath)
- {
- return cacheAction;
- }
- };
+ return full;
}
private CacheFilters()
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
index c0163c5..81373b1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
@@ -176,12 +176,23 @@ public class CompositeCuratorCache implements CuratorCache
}
@Override
- public Set<String> paths()
+ public Collection<String> paths()
{
Set<String> paths = new HashSet<>();
for ( CuratorCache cache : caches.values() )
{
- cache.clearAll();
+ paths.addAll(cache.paths());
+ }
+ return paths;
+ }
+
+ @Override
+ public Collection<String> childNamesAtPath(String path)
+ {
+ Set<String> paths = new HashSet<>();
+ for ( CuratorCache cache : caches.values() )
+ {
+ paths.addAll(cache.childNamesAtPath(path));
}
return paths;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
index 164ce29..db6e17f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -20,9 +20,8 @@ package org.apache.curator.framework.recipes.watch;
import org.apache.curator.framework.listen.Listenable;
import java.io.Closeable;
+import java.util.Collection;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/**
@@ -87,12 +86,19 @@ public interface CuratorCache extends Closeable
boolean exists(String path);
/**
- * Returns the set of paths in the cache. The returned set behaves in the same manner
- * as {@link ConcurrentHashMap#keySet()}
+ * Returns an immutable view of paths in the cache.
*
* @return set of paths
*/
- Set<String> paths();
+ Collection<String> paths();
+
+ /**
+ * Returns the set of child node names of the given node
+ *
+ * @param path node full path
+ * @return child names
+ */
+ Collection<String> childNamesAtPath(String path);
/**
* Return the node data stored for the path in the cache or null
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
index 3075c3b..1362679 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -20,12 +20,16 @@ package org.apache.curator.framework.recipes.watch;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
import com.google.common.cache.Cache;
+import com.google.common.collect.Collections2;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.utils.ZKPaths;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -38,6 +42,7 @@ abstract class CuratorCacheBase implements CuratorCache
private final AtomicReference<CountDownLatch> initialRefreshLatch = new AtomicReference<>(new CountDownLatch(1));
private final boolean sendRefreshEvents;
private final AtomicInteger refreshCount = new AtomicInteger(0);
+ private Function<String, String> function;
protected boolean isStarted()
{
@@ -94,9 +99,24 @@ abstract class CuratorCacheBase implements CuratorCache
}
@Override
- public final Set<String> paths()
+ public final Collection<String> paths()
{
- return cache.asMap().keySet();
+ return Collections.unmodifiableCollection(cache.asMap().keySet());
+ }
+
+ @Override
+ public Collection<String> childNamesAtPath(final String basePath)
+ {
+ function = new Function<String, String>()
+ {
+ @Override
+ public String apply(String path)
+ {
+ ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+ return pathAndNode.getPath().equals(basePath) ? pathAndNode.getNode() : null;
+ }
+ };
+ return Collections2.filter(Collections2.transform(paths(), function), Predicates.notNull());
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index df068c0..31633f0 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -34,6 +34,7 @@ public class CuratorCacheBuilder
private boolean sendRefreshEvents = true;
private boolean refreshOnStart = true;
private CacheFilter cacheFilter = CacheFilters.statAndData();
+ private boolean sortChildren = true;
public static CuratorCacheBuilder builder(CuratorFramework client, String path)
{
@@ -48,7 +49,7 @@ public class CuratorCacheBuilder
Preconditions.checkState(refreshFilter == null, "Single node caches do not use RefreshFilters");
return new InternalNodeCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart);
}
- return new InternalCuratorCache(client, path, cacheFilter, refreshFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart);
+ return new InternalCuratorCache(client, path, cacheFilter, refreshFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart, sortChildren);
}
public CuratorCacheBuilder forSingleNode()
@@ -71,15 +72,7 @@ public class CuratorCacheBuilder
{
singleNode = false;
refreshFilter = RefreshFilters.tree();
- cacheFilter = CacheFilters.statAndData();
- return this;
- }
-
- public CuratorCacheBuilder forFull(CacheAction cacheAction)
- {
- singleNode = false;
- refreshFilter = RefreshFilters.tree();
- cacheFilter = CacheFilters.full(cacheAction);
+ cacheFilter = CacheFilters.full();
return this;
}
@@ -131,6 +124,12 @@ public class CuratorCacheBuilder
return this;
}
+ public CuratorCacheBuilder sortingChildren(boolean sortChildren)
+ {
+ this.sortChildren = sortChildren;
+ return this;
+ }
+
private CuratorCacheBuilder(CuratorFramework client, String path)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index a26f430..750545b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -29,6 +29,8 @@ import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
@@ -41,6 +43,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
private final String basePath;
private final CacheFilter cacheFilter;
private final RefreshFilter refreshFilter;
+ private final boolean sortChildren;
private static final CachedNode nullNode = new CachedNode();
private static final RefreshFilter nopRefreshFilter = new RefreshFilter()
{
@@ -51,21 +54,23 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
};
- InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, final RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, final boolean refreshOnStart)
+ InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, final RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, final boolean refreshOnStart, boolean sortChildren)
{
super(cache, sendRefreshEvents);
this.client = Objects.requireNonNull(client, "client cannot be null");
basePath = Objects.requireNonNull(path, "path cannot be null");
this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
this.refreshFilter = Objects.requireNonNull(refreshFilter, "primingFilter cannot be null");
+ this.sortChildren = sortChildren;
watcher = new PersistentWatcher(client, path)
{
@Override
protected void noteWatcherReset()
{
- if ( refreshOnStart || (refreshCount() > 0) )
+ long count = refreshCount();
+ if ( (refreshOnStart && (count == 0)) || (count > 0) )
{
- internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter);
+ internalRefresh(basePath, new NotifyingRefresher(InternalCuratorCache.this, basePath), refreshFilter);
}
}
};
@@ -108,7 +113,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
case NodeCreated:
case NodeDataChanged:
{
- internalRefresh(event.getPath(), new Refresher(InternalCuratorCache.this, basePath), nopRefreshFilter);
+ internalRefresh(event.getPath(), new Refresher(InternalCuratorCache.this), nopRefreshFilter);
break;
}
}
@@ -126,7 +131,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
if ( isStarted() && path.startsWith(basePath) )
{
CountDownLatch latch = new CountDownLatch(1);
- Refresher refresher = new Refresher(this, path, latch);
+ Refresher refresher = new NotifyingRefresher(this, path, latch);
internalRefresh(path, refresher, refreshFilter);
return latch;
}
@@ -166,7 +171,12 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
else if ( event.getType() == CuratorEventType.CHILDREN )
{
- for ( String child : event.getChildren() )
+ List<String> children = event.getChildren();
+ if ( sortChildren )
+ {
+ Collections.sort(children);
+ }
+ for ( String child : children )
{
internalRefresh(ZKPaths.makePath(path, child), refresher, refreshFilter);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NotifyingRefresher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NotifyingRefresher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NotifyingRefresher.java
new file mode 100644
index 0000000..c34c94b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NotifyingRefresher.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import java.util.concurrent.CountDownLatch;
+
+class NotifyingRefresher extends Refresher
+{
+ private final String refreshPath;
+
+ NotifyingRefresher(CuratorCacheBase cacheBase, String refreshPath)
+ {
+ this(cacheBase, refreshPath, null);
+ }
+
+ NotifyingRefresher(CuratorCacheBase cacheBase, String refreshPath, CountDownLatch latch)
+ {
+ super(cacheBase, latch);
+ this.refreshPath = refreshPath;
+ }
+
+ protected void completed()
+ {
+ cacheBase.notifyListeners(CacheEvent.CACHE_REFRESHED, refreshPath, cacheBase.get(refreshPath));
+ super.completed();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 9bee7b1..ff7dc82 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -33,12 +33,14 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.io.Closeable;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class PersistentWatcher implements Closeable
{
private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
private final ListenerContainer<Watcher> listeners = new ListenerContainer<>();
+ private final AtomicBoolean isSet = new AtomicBoolean(false);
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
@@ -48,6 +50,10 @@ public class PersistentWatcher implements Closeable
{
reset();
}
+ else if ( (newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST) )
+ {
+ isSet.set(false);
+ }
}
};
private final Watcher watcher = new Watcher()
@@ -74,9 +80,19 @@ public class PersistentWatcher implements Closeable
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
- if ( (event.getType() == CuratorEventType.ADD_PERSISTENT_WATCH) && (event.getResultCode() == KeeperException.Code.OK.intValue()) )
+ if ( (event.getType() == CuratorEventType.ADD_PERSISTENT_WATCH) )
{
- noteWatcherReset();
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ if ( isSet.compareAndSet(false, true) )
+ {
+ noteWatcherReset();
+ }
+ }
+ else
+ {
+ isSet.set(false);
+ }
}
}
};
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
index 9ab1ca7..a03d000 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
@@ -18,6 +18,8 @@
*/
package org.apache.curator.framework.recipes.watch;
+import org.apache.zookeeper.server.PathIterator;
+
public class RefreshFilters
{
private static final RefreshFilter singleLevel = new RefreshFilter()
@@ -48,6 +50,29 @@ public class RefreshFilters
return tree;
}
+ public static RefreshFilter maxDepth(final int maxDepth)
+ {
+ return new RefreshFilter()
+ {
+ @Override
+ public boolean descend(String mainPath, String checkPath)
+ {
+ PathIterator pathIterator = new PathIterator(checkPath);
+ int thisDepth = 1;
+ while ( pathIterator.hasNext() )
+ {
+ String thisParent = pathIterator.next();
+ if ( thisParent.equals(mainPath) )
+ {
+ break;
+ }
+ ++thisDepth;
+ }
+ return (thisDepth <= maxDepth);
+ }
+ };
+ }
+
private RefreshFilters()
{
}
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
index fa2ae43..d9db264 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
@@ -23,21 +23,18 @@ import java.util.concurrent.atomic.AtomicInteger;
class Refresher
{
- private final CuratorCacheBase cacheBase;
- private final String refreshPath;
+ protected final CuratorCacheBase cacheBase;
private final CountDownLatch latch;
private final AtomicInteger count = new AtomicInteger(0);
- public Refresher(CuratorCacheBase cacheBase, String refreshPath)
+ Refresher(CuratorCacheBase cacheBase)
{
- this(cacheBase, refreshPath, null);
+ this(cacheBase, null);
}
- Refresher(CuratorCacheBase cacheBase, String refreshPath, CountDownLatch latch)
+ Refresher(CuratorCacheBase cacheBase, CountDownLatch latch)
{
-
this.cacheBase = cacheBase;
- this.refreshPath = refreshPath;
this.latch = latch;
}
@@ -50,12 +47,16 @@ class Refresher
{
if ( count.decrementAndGet() <= 0 )
{
- cacheBase.notifyListeners(CacheEvent.CACHE_REFRESHED, refreshPath, cacheBase.get(refreshPath));
- if ( latch != null )
- {
- latch.countDown();
- }
- cacheBase.incrementRefreshCount();
+ completed();
+ }
+ }
+
+ protected void completed()
+ {
+ if ( latch != null )
+ {
+ latch.countDown();
}
+ cacheBase.incrementRefreshCount();
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/BaseTestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/BaseTestTreeCache.java
new file mode 100644
index 0000000..ab2e999
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/BaseTestTreeCache.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BaseTestTreeCache extends BaseClassForTests
+{
+ protected CuratorFramework client;
+ protected CuratorCache cache;
+ private final AtomicBoolean hadBackgroundException = new AtomicBoolean(false);
+ private final BlockingQueue<EventAndNode> events = new LinkedBlockingQueue<>();
+ private final Timing timing = new Timing();
+
+ /**
+ * Automatically records all events into an easily testable event stream.
+ */
+ final CacheListener eventListener = new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path, CachedNode affectedNode)
+ {
+ if ( path.startsWith("/zookeeper") )
+ {
+ return;
+ }
+ events.add(new EventAndNode(event, path, affectedNode));
+ }
+ };
+
+ /**
+ * Construct a TreeCache that records exceptions and automatically listens.
+ */
+ protected CuratorCache newTreeCacheWithListeners(CuratorFramework client, String path)
+ {
+ CuratorCache result = CuratorCacheBuilder.builder(client, path).forTree().build();
+ result.getListenable().addListener(eventListener);
+ return result;
+ }
+
+ /**
+ * Finish constructing a TreeCache that records exceptions and automatically listens.
+ */
+ protected CuratorCache buildWithListeners(CuratorCacheBuilder builder)
+ {
+ CuratorCache result = builder.build();
+ result.getListenable().addListener(eventListener);
+ return result;
+ }
+
+ @Override
+ @BeforeMethod
+ public void setup() throws Exception
+ {
+ super.setup();
+ initCuratorFramework();
+ }
+
+ void initCuratorFramework()
+ {
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ }
+
+ @Override
+ @AfterMethod
+ public void teardown() throws Exception
+ {
+ try
+ {
+ try
+ {
+ Assert.assertFalse(hadBackgroundException.get(), "Background exceptions were thrown, see stderr for details");
+ assertNoMoreEvents();
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+ finally
+ {
+ super.teardown();
+ }
+ }
+
+ /**
+ * Asserts the event queue is empty.
+ */
+ void assertNoMoreEvents() throws InterruptedException
+ {
+ timing.sleepABit();
+ Assert.assertTrue(events.isEmpty(), String.format("Expected no events, found %d; first event: %s", events.size(), events.peek()));
+ }
+
+ /**
+ * Asserts the given event is next in the queue, and consumes it from the queue.
+ */
+ EventAndNode assertEvent(CacheEvent expectedType) throws InterruptedException
+ {
+ return assertEvent(expectedType, null);
+ }
+
+ /**
+ * Asserts the given event is next in the queue, and consumes it from the queue.
+ */
+ EventAndNode assertEvent(CacheEvent expectedType, String expectedPath) throws InterruptedException
+ {
+ return assertEvent(expectedType, expectedPath, null);
+ }
+
+ /**
+ * Asserts the given event is next in the queue, and consumes it from the queue.
+ */
+ EventAndNode assertEvent(CacheEvent expectedType, String expectedPath, byte[] expectedData) throws InterruptedException
+ {
+ EventAndNode event = events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+ Assert.assertNotNull(event, String.format("Expected type: %s, path: %s", expectedType, expectedPath));
+
+ String message = event.toString();
+ Assert.assertEquals(event.type, expectedType, message);
+ if ( expectedPath == null )
+ {
+ Assert.assertEquals(expectedType, event.type);
+ }
+ else
+ {
+ Assert.assertNotNull(event.node, message);
+ Assert.assertEquals(event.path, expectedPath, message);
+ }
+ if ( expectedData != null )
+ {
+ Assert.assertEquals(event.node.getData(), expectedData, message);
+ }
+ return event;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/EventAndNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/EventAndNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/EventAndNode.java
new file mode 100644
index 0000000..eb86216
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/EventAndNode.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+class EventAndNode
+{
+ final CacheEvent type;
+ final String path;
+ final CachedNode node;
+
+ EventAndNode(CacheEvent type, String path, CachedNode node)
+ {
+ this.type = type;
+ this.path = path;
+ this.node = node;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "EventAndNode{" + "type=" + type + ", path='" + path + '\'' + ", node=" + node + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestEventOrdering.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestEventOrdering.java
new file mode 100644
index 0000000..46ccf0a
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestEventOrdering.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class TestEventOrdering extends BaseClassForTests
+{
+ private final Timing timing = new Timing();
+ private final long start = System.currentTimeMillis();
+
+ private static final int THREAD_QTY = 100;
+ private static final int ITERATIONS = 100;
+ private static final int NODE_QTY = 10;
+
+ public enum EventType
+ {
+ ADDED,
+ DELETED
+ }
+
+ public static class Event
+ {
+ public final EventType eventType;
+ public final String path;
+ public final long time = System.currentTimeMillis();
+
+ public Event(EventType eventType, String path)
+ {
+ this.eventType = eventType;
+ this.path = path;
+ }
+ }
+
+ @Test
+ public void testEventOrdering() throws Exception
+ {
+ ExecutorService executorService = Executors.newFixedThreadPool(THREAD_QTY);
+ BlockingQueue<Event> events = Queues.newLinkedBlockingQueue();
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ CuratorCache cache = null;
+ try
+ {
+ client.start();
+ client.create().forPath("/root");
+ cache = CuratorCacheBuilder.builder(client, "/root").build();
+
+ final Random random = new Random();
+ final Callable<Void> task = new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ for ( int i = 0; i < ITERATIONS; ++i )
+ {
+ String node = "/root/" + random.nextInt(NODE_QTY);
+ try
+ {
+ switch ( random.nextInt(3) )
+ {
+ default:
+ case 0:
+ client.create().forPath(node);
+ break;
+
+ case 1:
+ client.setData().forPath(node, "new".getBytes());
+ break;
+
+ case 2:
+ client.delete().forPath(node);
+ break;
+ }
+ }
+ catch ( KeeperException ignore )
+ {
+ // ignore
+ }
+ }
+ return null;
+ }
+ };
+
+ final CountDownLatch latch = new CountDownLatch(THREAD_QTY);
+ for ( int i = 0; i < THREAD_QTY; ++i )
+ {
+ Callable<Void> wrapped = new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ return task.call();
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ }
+ };
+ executorService.submit(wrapped);
+ }
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ timing.sleepABit();
+
+ List<Event> localEvents = Lists.newArrayList();
+ int eventSuggestedQty = 0;
+ while ( events.size() > 0 )
+ {
+ Event event = events.take();
+ localEvents.add(event);
+ eventSuggestedQty += (event.eventType == EventType.ADDED) ? 1 : -1;
+ }
+ int actualQty = cache.size();
+ Assert.assertEquals(actualQty, eventSuggestedQty, String.format("actual %s expected %s:\n %s", actualQty, eventSuggestedQty, asString(localEvents)));
+ }
+ finally
+ {
+ executorService.shutdownNow();
+ //noinspection ThrowFromFinallyBlock
+ executorService.awaitTermination(timing.milliseconds(), TimeUnit.MILLISECONDS);
+ CloseableUtils.closeQuietly(cache);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ private String asString(List<Event> events)
+ {
+ int qty = 0;
+ StringBuilder str = new StringBuilder();
+ for ( Event event : events )
+ {
+ qty += (event.eventType == EventType.ADDED) ? 1 : -1;
+ str.append(event.eventType).append(" ").append(event.path).append(" @ ").append(event.time - start).append(' ').append(qty);
+ str.append("\n");
+ }
+ return str.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCacheInCluster.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCacheInCluster.java
new file mode 100644
index 0000000..3153764
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCacheInCluster.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class TestSingleLevelCuratorCacheInCluster
+{
+ private static final Timing timing = new Timing();
+
+ @Test
+ public void testServerLoss() throws Exception
+ {
+ CuratorFramework client = null;
+ CuratorCache cache = null;
+ TestingCluster cluster = new TestingCluster(3);
+ try
+ {
+ cluster.start();
+
+ client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test");
+
+ cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
+
+ final CountDownLatch resetLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ final CountDownLatch deleteLatch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(3);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.SUSPENDED )
+ {
+ resetLatch.countDown();
+ }
+ if ( newState.isConnected() )
+ {
+ reconnectLatch.countDown();
+ }
+ }
+ });
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path, CachedNode affectedNode)
+ {
+ if ( event == CacheEvent.NODE_CREATED )
+ {
+ latch.countDown();
+ }
+ else if ( event == CacheEvent.NODE_DELETED )
+ {
+ deleteLatch.countDown();
+ }
+ }
+ });
+ cache.start();
+
+ client.create().forPath("/test/one");
+ client.create().forPath("/test/two");
+ client.create().forPath("/test/three");
+
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+ cluster.killServer(connectionInstance);
+
+ Assert.assertTrue(timing.awaitLatch(resetLatch));
+ Assert.assertEquals(cache.size(), 3);
+
+ Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+ Assert.assertEquals(cache.size(), 3);
+
+ Assert.assertEquals(deleteLatch.getCount(), 1);
+ client.delete().forPath("/test/two");
+ Assert.assertTrue(timing.awaitLatch(deleteLatch));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(cluster);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestTreeCache.java
new file mode 100644
index 0000000..b4c5765
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestTreeCache.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestTreeCache extends BaseTestTreeCache
+{
+ @Test
+ public void testSelector() throws Exception
+ {
+ client.create().forPath("/root");
+ client.create().forPath("/root/n1-a");
+ client.create().forPath("/root/n1-b");
+ client.create().forPath("/root/n1-b/n2-a");
+ client.create().forPath("/root/n1-b/n2-b");
+ client.create().forPath("/root/n1-b/n2-b/n3-a");
+ client.create().forPath("/root/n1-c");
+ client.create().forPath("/root/n1-d");
+
+ CacheFilter cacheFilter = new CacheFilter()
+ {
+ @Override
+ public CacheAction actionForPath(String mainPath, String checkPath)
+ {
+ if ( checkPath.equals("/root/n1-c") )
+ {
+ return CacheAction.NOT_STORED;
+ }
+ return CacheAction.STAT_AND_DATA;
+ }
+ };
+ RefreshFilter refreshFilter = new RefreshFilter()
+ {
+ @Override
+ public boolean descend(String mainPath, String checkPath)
+ {
+ return !checkPath.equals("/root/n1-b/n2-b");
+ }
+ };
+ cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheFilter(cacheFilter).withRefreshFilter(refreshFilter));
+ cache.start();
+
+ assertEvent(CacheEvent.NODE_CREATED, "/root");
+ assertEvent(CacheEvent.NODE_CREATED, "/root/n1-a");
+ assertEvent(CacheEvent.NODE_CREATED, "/root/n1-b");
+ assertEvent(CacheEvent.NODE_CREATED, "/root/n1-d");
+ assertEvent(CacheEvent.NODE_CREATED, "/root/n1-b/n2-a");
+ assertEvent(CacheEvent.NODE_CREATED, "/root/n1-b/n2-b");
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testStartup() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/1", "one".getBytes());
+ client.create().forPath("/test/2", "two".getBytes());
+ client.create().forPath("/test/3", "three".getBytes());
+ client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+ cache = newTreeCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(CacheEvent.NODE_CREATED, "/test");
+ assertEvent(CacheEvent.NODE_CREATED, "/test/1", "one".getBytes());
+ assertEvent(CacheEvent.NODE_CREATED, "/test/2", "two".getBytes());
+ assertEvent(CacheEvent.NODE_CREATED, "/test/3", "three".getBytes());
+ assertEvent(CacheEvent.NODE_CREATED, "/test/2/sub", "two-sub".getBytes());
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test")), Sets.newHashSet("1", "2", "3"));
+ Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test/1")), Sets.newHashSet());
+ Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test/2")), Sets.newHashSet("sub"));
+ Assert.assertNull(cache.get("/test/non_exist"));
+ }
+
+ @Test
+ public void testStartEmpty() throws Exception
+ {
+ cache = newTreeCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+
+ client.create().forPath("/test");
+ assertEvent(CacheEvent.NODE_CREATED, "/test");
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testStartEmptyDeeper() throws Exception
+ {
+ cache = newTreeCacheWithListeners(client, "/test/foo/bar");
+ cache.start();
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+
+ client.create().creatingParentsIfNeeded().forPath("/test/foo");
+ assertNoMoreEvents();
+ client.create().forPath("/test/foo/bar");
+ assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar");
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testDepth0() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/1", "one".getBytes());
+ client.create().forPath("/test/2", "two".getBytes());
+ client.create().forPath("/test/3", "three".getBytes());
+ client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+ cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/test").forTree().withRefreshFilter(RefreshFilters.maxDepth(0)));
+ cache.start();
+ assertEvent(CacheEvent.NODE_CREATED, "/test");
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(cache.childNamesAtPath("/test"), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/1"));
+ Assert.assertEquals(cache.childNamesAtPath("/test/1").size(), 0);
+ Assert.assertNull(cache.get("/test/non_exist"));
+ }
+
+ @Test
+ public void testDepth1() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/1", "one".getBytes());
+ client.create().forPath("/test/2", "two".getBytes());
+ client.create().forPath("/test/3", "three".getBytes());
+ client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+ cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/test").forTree().withRefreshFilter(RefreshFilters.maxDepth(1)));
+ cache.start();
+ assertEvent(CacheEvent.NODE_CREATED, "/test");
+ assertEvent(CacheEvent.NODE_CREATED, "/test/1", "one".getBytes());
+ assertEvent(CacheEvent.NODE_CREATED, "/test/2", "two".getBytes());
+ assertEvent(CacheEvent.NODE_CREATED, "/test/3", "three".getBytes());
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test")), Sets.newHashSet("1", "2", "3"));
+ Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test/1")), Sets.newHashSet());
+ Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test/2")), Sets.newHashSet());
+ Assert.assertNull(cache.get("/test/2/sub"));
+ Assert.assertEquals(cache.childNamesAtPath("/test/2/sub").size(), 0);
+ Assert.assertEquals(cache.childNamesAtPath("/test/non_exist").size(), 0);
+ }
+
+ @Test
+ public void testDepth1Deeper() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo");
+ client.create().forPath("/test/foo/bar");
+ client.create().forPath("/test/foo/bar/1", "one".getBytes());
+ client.create().forPath("/test/foo/bar/2", "two".getBytes());
+ client.create().forPath("/test/foo/bar/3", "three".getBytes());
+ client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes());
+
+ cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/test/foo/bar").forTree().withRefreshFilter(RefreshFilters.maxDepth(1)));
+ cache.start();
+ assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar");
+ assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar/1", "one".getBytes());
+ assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar/2", "two".getBytes());
+ assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar/3", "three".getBytes());
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testAsyncInitialPopulation() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ cache = newTreeCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(CacheEvent.NODE_CREATED, "/test");
+ assertEvent(CacheEvent.NODE_CREATED, "/test/one");
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testFromRoot() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ cache = newTreeCacheWithListeners(client, "/");
+ cache.start();
+ assertEvent(CacheEvent.NODE_CREATED, "/");
+ assertEvent(CacheEvent.NODE_CREATED, "/test");
+ assertEvent(CacheEvent.NODE_CREATED, "/test/one");
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+ assertNoMoreEvents();
+
+ Assert.assertTrue(cache.childNamesAtPath("/").contains("test"));
+ Assert.assertEquals(cache.childNamesAtPath("/test"), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.childNamesAtPath("/test/one"), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ }
+
+ @Test
+ public void testFromRootWithDepth() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/").forTree().withRefreshFilter(RefreshFilters.maxDepth(1)));
+ cache.start();
+ assertEvent(CacheEvent.NODE_CREATED, "/");
+ assertEvent(CacheEvent.NODE_CREATED, "/test");
+ assertEvent(CacheEvent.CACHE_REFRESHED);
+ assertNoMoreEvents();
+
+ Assert.assertTrue(cache.childNamesAtPath("/").contains("test"));
+ Assert.assertEquals(cache.childNamesAtPath("/test"), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/one"));
+ Assert.assertEquals(cache.childNamesAtPath("/test/one").size(), 0);
+ }
+}
[3/5] curator git commit: renamed rebuildTestExchanger
Posted by ra...@apache.org.
renamed rebuildTestExchanger
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1f0bdf92
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1f0bdf92
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1f0bdf92
Branch: refs/heads/persistent-watch
Commit: 1f0bdf9265e6f5bfb34520761649240209c17d72
Parents: 313fd7d
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 16:26:10 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 16:26:10 2016 -0500
----------------------------------------------------------------------
.../curator/framework/recipes/watch/InternalCuratorCache.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1f0bdf92/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index 950c336..a26f430 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -19,7 +19,6 @@
package org.apache.curator.framework.recipes.watch;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.curator.framework.CuratorFramework;
@@ -135,7 +134,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
@VisibleForTesting
- volatile Exchanger<Object> rebuildTestExchanger;
+ volatile Exchanger<Object> debugRebuildTestExchanger;
private void internalRefresh(final String path, final Refresher refresher, final RefreshFilter refreshFilter)
{
@@ -178,9 +177,9 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
// TODO
}
refresher.decrement();
- if ( rebuildTestExchanger != null )
+ if ( debugRebuildTestExchanger != null )
{
- rebuildTestExchanger.exchange(new Object());
+ debugRebuildTestExchanger.exchange(new Object());
}
}
};
[4/5] curator git commit: finished ported tests
Posted by ra...@apache.org.
finished ported tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f8f5cafa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f8f5cafa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f8f5cafa
Branch: refs/heads/persistent-watch
Commit: f8f5cafa956da97c5fa177ac64ee003e955887da
Parents: 1f0bdf9
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 16:26:23 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 16:26:23 2016 -0500
----------------------------------------------------------------------
.../watch/TestSingleLevelCuratorCache.java | 211 +------------------
1 file changed, 4 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f8f5cafa/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
index c3cc327..0305e55 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
@@ -24,14 +24,10 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.TestCleanState;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
@@ -41,7 +37,6 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.testng.AssertJUnit.assertNotNull;
@@ -433,7 +428,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
}
}
});
- ((InternalCuratorCache)cache).rebuildTestExchanger = new Exchanger<Object>();
+ ((InternalCuratorCache)cache).debugRebuildTestExchanger = new Exchanger<Object>();
ExecutorService service = Executors.newSingleThreadExecutor();
final AtomicReference<String> deletedPath = new AtomicReference<String>();
Future<Object> future = service.submit
@@ -443,7 +438,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
@Override
public Object call() throws Exception
{
- ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+ ((InternalCuratorCache)cache).debugRebuildTestExchanger.exchange(new Object());
// simulate another process adding a node while we're rebuilding
client.create().forPath("/test/test");
@@ -454,7 +449,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
client.delete().forPath("/test/bar");
deletedPath.set("/test/bar");
- ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+ ((InternalCuratorCache)cache).debugRebuildTestExchanger.exchange(new Object());
CachedNode cachedNode = null;
while ( cachedNode == null )
@@ -465,7 +460,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
Assert.assertEquals(cachedNode.getData(), "original".getBytes());
client.setData().forPath("/test/snafu", "grilled".getBytes());
- ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+ ((InternalCuratorCache)cache).debugRebuildTestExchanger.exchange(new Object());
return null;
}
@@ -677,50 +672,6 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
}
}
- //@Test
- public void testRebuildNode() throws Exception
- {
- PathChildrenCache cache = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes());
-
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicInteger counter = new AtomicInteger();
- final Semaphore semaphore = new Semaphore(1);
- cache = new PathChildrenCache(client, "/test", true)
- {
- //@Override
- void getDataAndStat(String fullPath) throws Exception
- {
- semaphore.acquire();
- counter.incrementAndGet();
- //super.getDataAndStat(fullPath);
- latch.countDown();
- }
- };
- cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
- Assert.assertTrue(timing.awaitLatch(latch));
-
- int saveCounter = counter.get();
- client.setData().forPath("/test/one", "alt".getBytes());
- cache.rebuildNode("/test/one");
- Assert.assertEquals(cache.getCurrentData("/test/one").getData(), "alt".getBytes());
- Assert.assertEquals(saveCounter, counter.get());
-
- semaphore.release(1000);
- timing.sleepABit();
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
{
CacheFilter cacheFilter = cacheData ? CacheFilters.statAndData() : CacheFilters.statOnly();
@@ -805,158 +756,4 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
TestCleanState.closeAndTestClean(client);
}
}
-
- //@Test
- public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/test");
-
- final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
- final ExecutorService exec = Executors.newSingleThreadExecutor();
- try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
- {
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- if ( event.getData().getPath().equals("/test/one") )
- {
- events.offer(event.getType());
- }
- }
- }
- );
- cache.start();
-
- final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
- try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec) )
- {
- cache2.getListenable().addListener(
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
- throws Exception
- {
- if ( event.getData().getPath().equals("/test/one") )
- {
- events2.offer(event.getType());
- }
- }
- }
- );
- cache2.start();
-
- client.create().forPath("/test/one", "hey there".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
- Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
-
- client.setData().forPath("/test/one", "sup!".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
- Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
- Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
- Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
-
- client.delete().forPath("/test/one");
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
- Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
- }
- }
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testDeleteNodeAfterCloseDoesntCallExecutor()
- throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/test");
-
- final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
- try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
- {
- cache.start();
- client.create().forPath("/test/one", "hey there".getBytes());
-
- cache.rebuild();
- Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
- Assert.assertTrue(exec.isExecuteCalled());
-
- exec.setExecuteCalled(false);
- }
- Assert.assertFalse(exec.isExecuteCalled());
-
- client.delete().forPath("/test/one");
- timing.sleepABit();
- Assert.assertFalse(exec.isExecuteCalled());
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
-
- }
-
- /**
- * Tests the case where there's an outstanding operation being executed when the cache is
- * shut down. See CURATOR-121, this was causing misleading warning messages to be logged.
- */
- //@Test
- public void testInterruptedOperationOnShutdown() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1));
- client.start();
-
- try
- {
- final CountDownLatch latch = new CountDownLatch(1);
- try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) {
- @Override
- protected void handleException(Throwable e)
- {
- latch.countDown();
- }
- } )
- {
- cache.start();
-
-/*
- cache.offerOperation(new Operation()
- {
-
- @Override
- public void invoke() throws Exception
- {
- Thread.sleep(5000);
- }
- });
-*/
-
- Thread.sleep(1000);
-
- }
-
- latch.await(5, TimeUnit.SECONDS);
-
- Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred");
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
- }
}