You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/09/25 17:59:47 UTC
git commit: Work around edge case with protected mode. Make sure
created node gets deleted if retries fail during connection loss
Updated Branches:
refs/heads/CURATOR-45 [created] 9705b4114
Work around edge case with protected mode. Make sure created node gets deleted if retries fail during connection loss
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/9705b411
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/9705b411
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/9705b411
Branch: refs/heads/CURATOR-45
Commit: 9705b4114433b5776ea40c810fbdec1f8151e17d
Parents: 10eb1ef
Author: randgalt <ra...@apache.org>
Authored: Wed Sep 25 09:01:00 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Wed Sep 25 09:01:00 2013 -0700
----------------------------------------------------------------------
.../framework/imps/CreateBuilderImpl.java | 352 +++++++++++++------
.../recipes/leader/ChaosMonkeyCnxnFactory.java | 124 +++++++
.../recipes/leader/TestLeaderSelectorEdges.java | 255 ++++++++++++++
3 files changed, 615 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9705b411/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index ee99074..ebd342f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
@@ -23,6 +24,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.curator.RetryLoop;
import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.*;
import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.OperationType;
@@ -41,20 +43,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>
{
- private final CuratorFrameworkImpl client;
- private CreateMode createMode;
- private Backgrounding backgrounding;
- private boolean createParentsIfNeeded;
- private boolean doProtected;
- private boolean compress;
- private String protectedId;
- private ACLing acling;
+ private final CuratorFrameworkImpl client;
+ private CreateMode createMode;
+ private Backgrounding backgrounding;
+ private boolean createParentsIfNeeded;
+ private boolean doProtected;
+ private boolean compress;
+ private String protectedId;
+ private ACLing acling;
@VisibleForTesting
boolean failNextCreateForTesting = false;
@VisibleForTesting
- static final String PROTECTED_PREFIX = "_c_";
+ static final String PROTECTED_PREFIX = "_c_";
CreateBuilderImpl(CuratorFrameworkImpl client)
{
@@ -68,7 +70,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
protectedId = null;
}
- TransactionCreateBuilder asTransactionCreateBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
+ TransactionCreateBuilder asTransactionCreateBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
{
return new TransactionCreateBuilder()
{
@@ -95,7 +97,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
@Override
public CuratorTransactionBridge forPath(String path, byte[] data) throws Exception
{
- String fixedPath = client.fixForNamespace(path);
+ String fixedPath = client.fixForNamespace(path);
transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode), OperationType.CREATE, path);
return curatorTransaction;
}
@@ -417,60 +419,85 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
data = client.getCompressionProvider().compress(givenPath, data);
}
- final String adjustedPath = adjustPath(client.fixForNamespace(givenPath));
+ final String adjustedPath = adjustPath(client.fixForNamespace(givenPath));
- String returnPath = null;
+ String returnPath = null;
if ( backgrounding.inBackground() )
{
pathInBackground(adjustedPath, data, givenPath);
}
else
{
- returnPath = pathInForeground(adjustedPath, data);
- returnPath = client.unfixForNamespace(returnPath);
+ String path = protectedPathInForeground(adjustedPath, data);
+ returnPath = client.unfixForNamespace(path);
}
return returnPath;
}
+ private String protectedPathInForeground(String adjustedPath, byte[] data) throws Exception
+ {
+ try
+ {
+ return pathInForeground(adjustedPath, data);
+ }
+ catch ( KeeperException.ConnectionLossException e )
+ {
+ if ( protectedId != null )
+ {
+ /*
+ * CURATOR-45 : we don't know if the create operation was successful or not,
+ * register the znode to be sure it is deleted later.
+ */
+ findAndDeleteProtectedNodeInBackground(adjustedPath, protectedId, null);
+ /*
+ * The current UUID is scheduled to be deleted, it is not safe to use it again.
+ * If this builder is used again later create a new UUID
+ */
+ protectedId = UUID.randomUUID().toString();
+ }
+ throw e;
+ }
+ }
+
@Override
public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
{
- final TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background");
+ final TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background");
client.getZooKeeper().create
- (
- operationAndData.getData().getPath(),
- operationAndData.getData().getData(),
- acling.getAclList(operationAndData.getData().getPath()),
- createMode,
- new AsyncCallback.StringCallback()
- {
- @Override
- public void processResult(int rc, String path, Object ctx, String name)
+ (
+ operationAndData.getData().getPath(),
+ operationAndData.getData().getData(),
+ acling.getAclList(operationAndData.getData().getPath()),
+ createMode,
+ new AsyncCallback.StringCallback()
{
- trace.commit();
-
- if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
- {
- backgroundCreateParentsThenNode(operationAndData);
- }
- else
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name)
{
- sendBackgroundResponse(rc, path, ctx, name, operationAndData);
+ trace.commit();
+
+ if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
+ {
+ backgroundCreateParentsThenNode(operationAndData);
+ }
+ else
+ {
+ sendBackgroundResponse(rc, path, ctx, name, operationAndData);
+ }
}
- }
- },
- backgrounding.getContext()
- );
+ },
+ backgrounding.getContext()
+ );
}
- private String getProtectedPrefix()
+ private static String getProtectedPrefix(String protectedId)
{
return PROTECTED_PREFIX + protectedId + "-";
}
private void backgroundCreateParentsThenNode(final OperationAndData<PathAndBytes> mainOperationAndData)
{
- BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>()
+ BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>()
{
@Override
public void performBackgroundOperation(OperationAndData<PathAndBytes> dummy) throws Exception
@@ -486,7 +513,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
client.queueOperation(mainOperationAndData);
}
};
- OperationAndData<PathAndBytes> parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
+ OperationAndData<PathAndBytes> parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
client.queueOperation(parentOperation);
}
@@ -558,16 +585,39 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
private void pathInBackground(final String path, final byte[] data, final String givenPath)
{
final AtomicBoolean firstTime = new AtomicBoolean(true);
- OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext())
+ OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(),
+ new OperationAndData.ErrorCallback<PathAndBytes>()
+ {
+ public void retriesExhausted(OperationAndData<PathAndBytes> operationAndData)
+ {
+ if ( doProtected )
+ {
+ // all retries have failed, findProtectedNodeInForeground(..) included, schedule a clean up
+ findAndDeleteProtectedNodeInBackground(path, protectedId, null);
+ // assign a new id if this builder is used again later
+ protectedId = UUID.randomUUID().toString();
+ }
+ }
+ },
+ backgrounding.getContext())
{
@Override
void callPerformBackgroundOperation() throws Exception
{
- boolean callSuper = true;
- boolean localFirstTime = firstTime.getAndSet(false);
+ boolean callSuper = true;
+ boolean localFirstTime = firstTime.getAndSet(false);
if ( !localFirstTime && doProtected )
{
- String createdPath = findProtectedNodeInForeground(path);
+ String createdPath = null;
+ try
+ {
+ createdPath = findProtectedNodeInForeground(path);
+ }
+ catch ( KeeperException.ConnectionLossException e )
+ {
+ sendBackgroundResponse(KeeperException.Code.CONNECTIONLOSS.intValue(), path, backgrounding.getContext(), null, this);
+ callSuper = false;
+ }
if ( createdPath != null )
{
try
@@ -600,117 +650,187 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
private String pathInForeground(final String path, final byte[] data) throws Exception
{
- TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");
+ TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");
- final AtomicBoolean firstTime = new AtomicBoolean(true);
- String returnPath = RetryLoop.callWithRetry
- (
- client.getZookeeperClient(),
- new Callable<String>()
- {
- @Override
- public String call() throws Exception
+ final AtomicBoolean firstTime = new AtomicBoolean(true);
+ String returnPath = RetryLoop.callWithRetry
+ (
+ client.getZookeeperClient(),
+ new Callable<String>()
{
- boolean localFirstTime = firstTime.getAndSet(false);
-
- String createdPath = null;
- if ( !localFirstTime && doProtected )
+ @Override
+ public String call() throws Exception
{
- createdPath = findProtectedNodeInForeground(path);
- }
+ boolean localFirstTime = firstTime.getAndSet(false);
- if ( createdPath == null )
- {
- try
+ String createdPath = null;
+ if ( !localFirstTime && doProtected )
{
- createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
+ createdPath = findProtectedNodeInForeground(path);
}
- catch ( KeeperException.NoNodeException e )
+
+ if ( createdPath == null )
{
- if ( createParentsIfNeeded )
+ try
{
- ZKPaths.mkdirs(client.getZooKeeper(), path, false);
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
}
- else
+ catch ( KeeperException.NoNodeException e )
{
- throw e;
+ if ( createParentsIfNeeded )
+ {
+ ZKPaths.mkdirs(client.getZooKeeper(), path, false);
+ createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
+ }
+ else
+ {
+ throw e;
+ }
}
}
- }
- if ( failNextCreateForTesting )
- {
- failNextCreateForTesting = false;
- throw new KeeperException.ConnectionLossException();
+ if ( failNextCreateForTesting )
+ {
+ failNextCreateForTesting = false;
+ throw new KeeperException.ConnectionLossException();
+ }
+ return createdPath;
}
- return createdPath;
}
- }
- );
+ );
trace.commit();
return returnPath;
}
- private String findProtectedNodeInForeground(final String path) throws Exception
+ private String findProtectedNodeInForeground(final String path) throws Exception
{
- TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground");
+ TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground");
- String returnPath = RetryLoop.callWithRetry
- (
- client.getZookeeperClient(),
- new Callable<String>()
- {
- @Override
- public String call() throws Exception
+ String returnPath = RetryLoop.callWithRetry
+ (
+ client.getZookeeperClient(),
+ new Callable<String>()
{
- String foundNode = null;
- try
+ @Override
+ public String call() throws Exception
{
- final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
- List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
-
- final String protectedPrefix = getProtectedPrefix();
- foundNode = Iterables.find
- (
- children,
- new Predicate<String>()
- {
- @Override
- public boolean apply(String node)
- {
- return node.startsWith(protectedPrefix);
- }
- },
- null
- );
- if ( foundNode != null )
+ String foundNode = null;
+ try
{
- foundNode = ZKPaths.makePath(pathAndNode.getPath(), foundNode);
+ final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+ List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
+
+ foundNode = findNode(children, pathAndNode.getPath(), protectedId);
}
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
+ return foundNode;
}
- catch ( KeeperException.NoNodeException ignore )
- {
- // ignore
- }
- return foundNode;
}
- }
- );
+ );
trace.commit();
return returnPath;
}
- private String adjustPath(String path) throws Exception
+ private String adjustPath(String path) throws Exception
{
if ( doProtected )
{
- ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
- String name = getProtectedPrefix() + pathAndNode.getNode();
+ ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+ String name = getProtectedPrefix(protectedId) + pathAndNode.getNode();
path = ZKPaths.makePath(pathAndNode.getPath(), name);
}
return path;
}
+
+ /**
+ * Attempt to delete a protected znode
+ *
+ * @param path the path
+ * @param protectedId the protected id
+ * @param callback callback to use, <code>null</code> to create a new one
+ */
+ private void findAndDeleteProtectedNodeInBackground(String path, String protectedId, FindProtectedNodeCB callback)
+ {
+ if ( client.isStarted() )
+ {
+ if ( callback == null )
+ {
+ callback = new FindProtectedNodeCB(path, protectedId);
+ }
+ try
+ {
+ client.getChildren().inBackground(callback).forPath(ZKPaths.getPathAndNode(path).getPath());
+ }
+ catch ( Exception e )
+ {
+ findAndDeleteProtectedNodeInBackground(path, protectedId, callback);
+ }
+ }
+ }
+
+ private class FindProtectedNodeCB implements BackgroundCallback
+ {
+ final String path;
+ final String protectedId;
+
+ private FindProtectedNodeCB(String path, String protectedId)
+ {
+ this.path = path;
+ this.protectedId = protectedId;
+ }
+
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ final String node = findNode(event.getChildren(), ZKPaths.getPathAndNode(path).getPath(), protectedId);
+ if ( node != null )
+ {
+ client.delete().guaranteed().inBackground().forPath(node);
+ }
+ }
+ else if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+ {
+ // retry
+ findAndDeleteProtectedNodeInBackground(path, protectedId, this);
+ }
+ }
+ }
+
+ /**
+ * Attempt to find the znode that matches the given path and protected id
+ *
+ * @param children a list of candidates znodes
+ * @param path the path
+ * @param protectedId the protected id
+ * @return the absolute path of the znode or <code>null</code> if it is not found
+ */
+ private static String findNode(final List<String> children, final String path, final String protectedId)
+ {
+ final String protectedPrefix = getProtectedPrefix(protectedId);
+ String foundNode = Iterables.find
+ (
+ children,
+ new Predicate<String>()
+ {
+ @Override
+ public boolean apply(String node)
+ {
+ return node.startsWith(protectedPrefix);
+ }
+ },
+ null
+ );
+ if ( foundNode != null )
+ {
+ foundNode = ZKPaths.makePath(path, foundNode);
+ }
+ return foundNode;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9705b411/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
new file mode 100644
index 0000000..5f10c5e
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.leader;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+/**
+ * A connection factory that will behave like the NIOServerCnxnFactory except that
+ * it will unexpectedly close the connection right after the <b>first</b> znode has
+ * been created in Zookeeper.
+ * Subsequent create operations will succeed.
+ */
+public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
+{
+ public static final String CHAOS_ZNODE = "/mylock";
+ public static final String CHAOS_ZNODE_PREFIX = CHAOS_ZNODE + "/";
+
+ private static final Logger log = LoggerFactory.getLogger(ChaosMonkeyCnxnFactory.class);
+
+ /* How long after the first error, connections are rejected */
+ public static final long LOCKOUT_DURATION_MS = 6000;
+
+ public ChaosMonkeyCnxnFactory() throws IOException
+ {
+ }
+
+ @Override
+ public void startup(ZooKeeperServer zks) throws IOException, InterruptedException
+ {
+ super.startup(new ChaosMonkeyZookeeperServer(zks));
+ }
+
+ /**
+ * Build a connection with a Chaos Monkey ZookeeperServer
+ */
+ protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException
+ {
+ return new NIOServerCnxn(zkServer, sock, sk, this);
+ }
+
+ public static class ChaosMonkeyZookeeperServer extends ZooKeeperServer
+ {
+ private long firstError = 0;
+
+ public ChaosMonkeyZookeeperServer(ZooKeeperServer zks)
+ {
+ setTxnLogFactory(zks.getTxnLogFactory());
+ setTickTime(zks.getTickTime());
+ setMinSessionTimeout(zks.getMinSessionTimeout());
+ setMaxSessionTimeout(zks.getMaxSessionTimeout());
+ }
+
+ @Override
+ public void submitRequest(Request si)
+ {
+ long remaining = firstError != 0 ? LOCKOUT_DURATION_MS - (System.currentTimeMillis() - firstError) : 0;
+ if ( si.type != ZooDefs.OpCode.createSession && si.type != ZooDefs.OpCode.sync && si.type != ZooDefs.OpCode.ping
+ && firstError != 0 && remaining > 0 )
+ {
+ log.debug("Rejected : " + si.toString());
+ // Still reject request
+ log.debug("Still not ready for " + remaining + "ms");
+ ((NIOServerCnxn)si.cnxn).close();
+ return;
+ }
+ // Submit the request to the legacy Zookeeper server
+ log.debug("Applied : " + si.toString());
+ super.submitRequest(si);
+ // Raise an error if a lock is created
+ if ( si.type == ZooDefs.OpCode.create )
+ {
+ CreateRequest createRequest = new CreateRequest();
+ try
+ {
+ ByteBuffer duplicate = si.request.duplicate();
+ duplicate.rewind();
+ ByteBufferInputStream.byteBuffer2Record(duplicate, createRequest);
+ if ( createRequest.getPath().startsWith(CHAOS_ZNODE_PREFIX)
+ && firstError == 0 )
+ {
+ firstError = System.currentTimeMillis();
+ // The znode has been created, close the connection and don't tell it to client
+ log.warn("Closing connection right after " + createRequest.getPath() + " creation");
+ ((NIOServerCnxn)si.cnxn).close();
+ }
+ }
+ catch ( Exception e )
+ {
+ // Should not happen
+ ((NIOServerCnxn)si.cnxn).close();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9705b411/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
new file mode 100644
index 0000000..e89c958
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.leader;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test cases designed after CURATOR-45
+ */
+public class TestLeaderSelectorEdges extends BaseClassForTests
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @BeforeClass
+ public static void setCNXFactory()
+ {
+ System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, ChaosMonkeyCnxnFactory.class.getName());
+ }
+
+ @AfterClass
+ public static void resetCNXFactory()
+ {
+ System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+ }
+
+ /**
+ * Create a LeaderSelector but close the connection right after the "lock" znode
+ * has been created.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void flappingTest() throws Exception
+ {
+ final CuratorFramework client =
+ CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .retryPolicy(new RetryNTimes(1, 500))
+ .sessionTimeoutMs(30000)
+ .build();
+
+ final TestLeaderSelectorListener listener = new TestLeaderSelectorListener();
+ LeaderSelector leaderSelector1 =
+ new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE, listener);
+ LeaderSelector leaderSelector2 = null;
+
+ client.start();
+ try
+ {
+ client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+ leaderSelector1.start();
+ // At this point the ChaosMonkeyZookeeperServer must close the connection
+ // right after the lock znode is created.
+ Assert.assertTrue(listener.reconnected.await(10, TimeUnit.SECONDS), "Connection has not been lost");
+ // Check that leader ship has failed
+ Assert.assertEquals(listener.takeLeadership.getCount(), 1);
+ // Wait FailedDelete
+ Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+ // Check that there is no znode
+ final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+ Assert.assertEquals(children, 0,
+ "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+ // Check that a new LeaderSelector can be started
+ leaderSelector2 = new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE,
+ listener);
+ leaderSelector2.start();
+ Assert.assertTrue(listener.takeLeadership.await(1, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ try
+ {
+ leaderSelector1.close();
+ }
+ catch ( IllegalStateException e )
+ {
+ Assert.fail(e.getMessage());
+ }
+ try
+ {
+ if ( leaderSelector2 != null )
+ {
+ leaderSelector2.close();
+ }
+ }
+ catch ( IllegalStateException e )
+ {
+ Assert.fail(e.getMessage());
+ }
+ client.close();
+ }
+ }
+
+ private class TestLeaderSelectorListener implements LeaderSelectorListener
+ {
+ final CountDownLatch takeLeadership = new CountDownLatch(1);
+ final CountDownLatch reconnected = new CountDownLatch(1);
+
+ @Override
+ public void takeLeadership(CuratorFramework client) throws Exception
+ {
+ log.info("-->takeLeadership({})", client.toString());
+ takeLeadership.countDown();
+ log.info("<--takeLeadership({})", client.toString());
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ reconnected.countDown();
+ }
+ }
+
+ }
+
+ /**
+ * Create a protected node in background with a retry policy
+ */
+ @Test
+ public void createProtectedNodeInBackgroundTest() throws Exception
+ {
+ final CuratorFramework client =
+ CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .retryPolicy(new RetryNTimes(2, 1))
+ .connectionTimeoutMs(100)
+ .sessionTimeoutMs(60000)
+ .build();
+ final CountDownLatch latch = new CountDownLatch(1);
+ client.start();
+ try
+ {
+ client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+ client.create()
+ .withProtection()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+ .inBackground(
+ new BackgroundCallback()
+ {
+ public void processResult(CuratorFramework client, CuratorEvent event)
+ throws Exception
+ {
+ log.info("Receive event {}", event.toString());
+ if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+ {
+ latch.countDown();
+ }
+ }
+ }
+ )
+ .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
+
+ Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
+ // Wait for the znode to be deleted
+ Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+ // Check that there is no znode
+ final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+ Assert.assertEquals(children, 0,
+ "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+ }
+ finally
+ {
+ client.close();
+ }
+
+ }
+
+ /**
+ * Same test as above but without a retry policy
+ */
+ @Test
+ public void createProtectedNodeInBackgroundTestNoRetry() throws Exception
+ {
+ final CuratorFramework client =
+ CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .retryPolicy(new RetryNTimes(0, 0))
+ .connectionTimeoutMs(100)
+ .sessionTimeoutMs(60000)
+ .build();
+ final CountDownLatch latch = new CountDownLatch(1);
+ client.start();
+ try
+ {
+ client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+ client.create()
+ .withProtection()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+ .inBackground(
+ new BackgroundCallback()
+ {
+ public void processResult(CuratorFramework client, CuratorEvent event)
+ throws Exception
+ {
+ log.info("Receive event {}", event.toString());
+ if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+ {
+ latch.countDown();
+ }
+ }
+ }
+ )
+ .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
+
+ Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
+ // Wait for the znode to be deleted
+ Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+ // Check that there is no znode
+ final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+ Assert.assertEquals(children, 0,
+ "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+ }
+ finally
+ {
+ client.close();
+ }
+
+ }
+}