You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/04/06 23:41:36 UTC
curator git commit: make sure ChildReaper always moves through all
registered nodes. Also,
add an optionl check so that large nodes are never queried
Repository: curator
Updated Branches:
refs/heads/CURATOR-203 [created] 7fe94bb15
make sure ChildReaper always moves through all registered nodes. Also, add an optionl check so that large nodes are never queried
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7fe94bb1
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7fe94bb1
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7fe94bb1
Branch: refs/heads/CURATOR-203
Commit: 7fe94bb1517fa32dcb2e1972ada9b7b493c1108b
Parents: 6a56c51
Author: randgalt <ra...@apache.org>
Authored: Mon Apr 6 16:41:24 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Apr 6 16:41:24 2015 -0500
----------------------------------------------------------------------
.../framework/recipes/locks/ChildReaper.java | 52 ++++-
.../recipes/locks/TestChildReaper.java | 195 +++++++++++++++----
2 files changed, 207 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/7fe94bb1/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 6a2c05a..ee5c414 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,13 +19,14 @@
*/
package org.apache.curator.framework.recipes.locks;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
-
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.utils.CloseableScheduledExecutorService;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
@@ -34,13 +36,14 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
/**
* Utility to reap empty child nodes of a parent node. Periodically calls getChildren on
@@ -53,11 +56,13 @@ public class ChildReaper implements Closeable
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final CuratorFramework client;
private final Collection<String> paths = Sets.newConcurrentHashSet();
+ private volatile Iterator<String> pathIterator = null;
private final Reaper.Mode mode;
private final CloseableScheduledExecutorService executor;
private final int reapingThresholdMs;
private final LeaderLatch leaderLatch;
private final Set<String> lockSchema;
+ private final AtomicInteger maxChildren = new AtomicInteger(-1);
private volatile Future<?> task;
@@ -210,19 +215,54 @@ public class ChildReaper implements Closeable
return paths.remove(PathUtils.validatePath(path));
}
+ /**
+ * If a node has so many children that {@link CuratorFramework#getChildren()} will fail
+ * (due to jute.maxbuffer) it can cause connection instability. Set the max number of
+ * children here to prevent the path from being queried in these cases. The number should usually
+ * be: avergage-node-name/1000000
+ *
+ * @param maxChildren max children
+ */
+ public void setMaxChildren(int maxChildren)
+ {
+ this.maxChildren.set(maxChildren);
+ }
+
public static ScheduledExecutorService newExecutorService()
{
return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
}
+ @VisibleForTesting
+ protected void warnMaxChildren(String path, Stat stat)
+ {
+ log.warn(String.format("Skipping %s as it has too many children: %d", path, stat.getNumChildren()));
+ }
+
private void doWork()
{
- if (shouldDoWork())
+ if ( shouldDoWork() )
{
- for ( String path : paths )
+ if ( (pathIterator == null) || !pathIterator.hasNext() )
+ {
+ pathIterator = paths.iterator();
+ }
+ while ( pathIterator.hasNext() )
{
+ String path = pathIterator.next();
try
{
+ int maxChildren = this.maxChildren.get();
+ if ( maxChildren > 0 )
+ {
+ Stat stat = client.checkExists().forPath(path);
+ if ( (stat != null) && (stat.getNumChildren() > maxChildren) )
+ {
+ warnMaxChildren(path, stat);
+ continue;
+ }
+ }
+
List<String> children = client.getChildren().forPath(path);
for ( String name : children )
{
http://git-wip-us.apache.org/repos/asf/curator/blob/7fe94bb1/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
index d81bb3a..906d9d4 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,35 +16,161 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.locks;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
public class TestChildReaper extends BaseClassForTests
{
@Test
- public void testSomeNodes() throws Exception
+ public void testMaxChildren() throws Exception
+ {
+ server.close();
+
+ final int LARGE_QTY = 10000;
+
+ System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
+ server = new TestingServer();
+ try
+ {
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
+ try
+ {
+ client.start();
+
+ for ( int i = 0; i < LARGE_QTY; ++i )
+ {
+ if ( (i % 1000) == 0 )
+ {
+ System.out.println(i);
+ }
+ client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
+ }
+
+ try
+ {
+ client.getChildren().forPath("/big");
+ Assert.fail("Should have been a connection loss");
+ }
+ catch ( KeeperException.ConnectionLossException e )
+ {
+ // expected
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ reaper = new ChildReaper(client, "/big", Reaper.Mode.REAP_UNTIL_DELETE, 1)
+ {
+ @Override
+ protected void warnMaxChildren(String path, Stat stat)
+ {
+ latch.countDown();
+ super.warnMaxChildren(path, stat);
+ }
+ };
+ reaper.setMaxChildren(100);
+ reaper.start();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(reaper);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+ finally
+ {
+ System.clearProperty("jute.maxbuffer");
+ }
+ }
+
+ @Test
+ public void testLargeNodes() throws Exception
{
+ server.close();
+
+ final int LARGE_QTY = 10000;
+ final int SMALL_QTY = 100;
+
+ System.setProperty("jute.maxbuffer", "" + LARGE_QTY);
+ server = new TestingServer();
+ try
+ {
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
+ try
+ {
+ client.start();
+
+ for ( int i = 0; i < LARGE_QTY; ++i )
+ {
+ if ( (i % 1000) == 0 )
+ {
+ System.out.println(i);
+ }
+ client.create().creatingParentsIfNeeded().forPath("/big/node-" + i);
+
+ if ( i < SMALL_QTY )
+ {
+ client.create().creatingParentsIfNeeded().forPath("/small/node-" + i);
+ }
+ }
+
+ reaper = new ChildReaper(client, "/foo", Reaper.Mode.REAP_UNTIL_DELETE, 1);
+ reaper.start();
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ reaper.addPath("/big");
+ reaper.addPath("/small");
+
+ int count = -1;
+ for ( int i = 0; (i < 10) && (count != 0); ++i )
+ {
+ timing.sleepABit();
+ count = client.checkExists().forPath("/small").getNumChildren();
+ }
+ Assert.assertEquals(count, 0);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(reaper);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+ finally
+ {
+ System.clearProperty("jute.maxbuffer");
+ }
+ }
+
+ @Test
+ public void testSomeNodes() throws Exception
+ {
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
- Random r = new Random();
- int nonEmptyNodes = 0;
+ Random r = new Random();
+ int nonEmptyNodes = 0;
for ( int i = 0; i < 10; ++i )
{
client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
@@ -60,7 +186,7 @@ public class TestChildReaper extends BaseClassForTests
timing.forWaiting().sleepABit();
- Stat stat = client.checkExists().forPath("/test");
+ Stat stat = client.checkExists().forPath("/test");
Assert.assertEquals(stat.getNumChildren(), nonEmptyNodes);
}
finally
@@ -71,11 +197,11 @@ public class TestChildReaper extends BaseClassForTests
}
@Test
- public void testSimple() throws Exception
+ public void testSimple() throws Exception
{
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
@@ -90,7 +216,7 @@ public class TestChildReaper extends BaseClassForTests
timing.forWaiting().sleepABit();
- Stat stat = client.checkExists().forPath("/test");
+ Stat stat = client.checkExists().forPath("/test");
Assert.assertEquals(stat.getNumChildren(), 0);
}
finally
@@ -101,11 +227,11 @@ public class TestChildReaper extends BaseClassForTests
}
@Test
- public void testLeaderElection() throws Exception
+ public void testLeaderElection() throws Exception
{
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
LeaderLatch otherLeader = null;
try
{
@@ -118,6 +244,7 @@ public class TestChildReaper extends BaseClassForTests
otherLeader = new LeaderLatch(client, "/test-leader");
otherLeader.start();
+ otherLeader.await();
reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, "/test-leader");
reaper.start();
@@ -125,7 +252,7 @@ public class TestChildReaper extends BaseClassForTests
timing.forWaiting().sleepABit();
//Should not have reaped anything at this point since otherLeader is still leader
- Stat stat = client.checkExists().forPath("/test");
+ Stat stat = client.checkExists().forPath("/test");
Assert.assertEquals(stat.getNumChildren(), 10);
CloseableUtils.closeQuietly(otherLeader);
@@ -138,7 +265,7 @@ public class TestChildReaper extends BaseClassForTests
finally
{
CloseableUtils.closeQuietly(reaper);
- if (otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED)
+ if ( otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED )
{
CloseableUtils.closeQuietly(otherLeader);
}
@@ -147,11 +274,11 @@ public class TestChildReaper extends BaseClassForTests
}
@Test
- public void testMultiPath() throws Exception
+ public void testMultiPath() throws Exception
{
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
@@ -169,7 +296,7 @@ public class TestChildReaper extends BaseClassForTests
timing.forWaiting().sleepABit();
- Stat stat = client.checkExists().forPath("/test1");
+ Stat stat = client.checkExists().forPath("/test1");
Assert.assertEquals(stat.getNumChildren(), 0);
stat = client.checkExists().forPath("/test2");
Assert.assertEquals(stat.getNumChildren(), 0);
@@ -184,11 +311,11 @@ public class TestChildReaper extends BaseClassForTests
}
@Test
- public void testNamespace() throws Exception
+ public void testNamespace() throws Exception
{
- Timing timing = new Timing();
- ChildReaper reaper = null;
- CuratorFramework client = CuratorFrameworkFactory.builder()
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.sessionTimeoutMs(timing.session())
.connectionTimeoutMs(timing.connection())
@@ -209,7 +336,7 @@ public class TestChildReaper extends BaseClassForTests
timing.forWaiting().sleepABit();
- Stat stat = client.checkExists().forPath("/test");
+ Stat stat = client.checkExists().forPath("/test");
Assert.assertEquals(stat.getNumChildren(), 0);
stat = client.usingNamespace(null).checkExists().forPath("/foo/test");