You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/09/25 17:59:47 UTC

git commit: Work around edge case with protected mode. Make sure created node gets deleted if retries fail during connection loss

Updated Branches:
  refs/heads/CURATOR-45 [created] 9705b4114


Work around edge case with protected mode. Make sure created node gets deleted if retries fail during connection loss


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/9705b411
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/9705b411
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/9705b411

Branch: refs/heads/CURATOR-45
Commit: 9705b4114433b5776ea40c810fbdec1f8151e17d
Parents: 10eb1ef
Author: randgalt <ra...@apache.org>
Authored: Wed Sep 25 09:01:00 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Wed Sep 25 09:01:00 2013 -0700

----------------------------------------------------------------------
 .../framework/imps/CreateBuilderImpl.java       | 352 +++++++++++++------
 .../recipes/leader/ChaosMonkeyCnxnFactory.java  | 124 +++++++
 .../recipes/leader/TestLeaderSelectorEdges.java | 255 ++++++++++++++
 3 files changed, 615 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9705b411/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index ee99074..ebd342f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,6 +24,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.*;
 import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
 import org.apache.curator.framework.api.transaction.OperationType;
@@ -41,20 +43,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>
 {
-    private final CuratorFrameworkImpl      client;
-    private CreateMode                      createMode;
-    private Backgrounding                   backgrounding;
-    private boolean                         createParentsIfNeeded;
-    private boolean                         doProtected;
-    private boolean                         compress;
-    private String                          protectedId;
-    private ACLing                          acling;
+    private final CuratorFrameworkImpl client;
+    private CreateMode createMode;
+    private Backgrounding backgrounding;
+    private boolean createParentsIfNeeded;
+    private boolean doProtected;
+    private boolean compress;
+    private String protectedId;
+    private ACLing acling;
 
     @VisibleForTesting
     boolean failNextCreateForTesting = false;
 
     @VisibleForTesting
-    static final String         PROTECTED_PREFIX = "_c_";
+    static final String PROTECTED_PREFIX = "_c_";
 
     CreateBuilderImpl(CuratorFrameworkImpl client)
     {
@@ -68,7 +70,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
         protectedId = null;
     }
 
-    TransactionCreateBuilder        asTransactionCreateBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
+    TransactionCreateBuilder asTransactionCreateBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
     {
         return new TransactionCreateBuilder()
         {
@@ -95,7 +97,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
             @Override
             public CuratorTransactionBridge forPath(String path, byte[] data) throws Exception
             {
-                String      fixedPath = client.fixForNamespace(path);
+                String fixedPath = client.fixForNamespace(path);
                 transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode), OperationType.CREATE, path);
                 return curatorTransaction;
             }
@@ -417,60 +419,85 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
             data = client.getCompressionProvider().compress(givenPath, data);
         }
 
-        final String    adjustedPath = adjustPath(client.fixForNamespace(givenPath));
+        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath));
 
-        String  returnPath = null;
+        String returnPath = null;
         if ( backgrounding.inBackground() )
         {
             pathInBackground(adjustedPath, data, givenPath);
         }
         else
         {
-            returnPath = pathInForeground(adjustedPath, data);
-            returnPath = client.unfixForNamespace(returnPath);
+            String path = protectedPathInForeground(adjustedPath, data);
+            returnPath = client.unfixForNamespace(path);
         }
         return returnPath;
     }
 
+    private String protectedPathInForeground(String adjustedPath, byte[] data) throws Exception
+    {
+        try
+        {
+            return pathInForeground(adjustedPath, data);
+        }
+        catch ( KeeperException.ConnectionLossException e )
+        {
+            if ( protectedId != null )
+            {
+                /*
+                 * CURATOR-45 : we don't know if the create operation was successful or not,
+                 * register the znode to be sure it is deleted later.
+                 */
+                findAndDeleteProtectedNodeInBackground(adjustedPath, protectedId, null);
+                /*
+                * The current UUID is scheduled to be deleted, it is not safe to use it again.
+                * If this builder is used again later create a new UUID
+                */
+                protectedId = UUID.randomUUID().toString();
+            }
+            throw e;
+        }
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
     {
-        final TimeTrace   trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background");
+        final TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background");
         client.getZooKeeper().create
-        (
-            operationAndData.getData().getPath(),
-            operationAndData.getData().getData(),
-            acling.getAclList(operationAndData.getData().getPath()),
-            createMode,
-            new AsyncCallback.StringCallback()
-            {
-                @Override
-                public void processResult(int rc, String path, Object ctx, String name)
+            (
+                operationAndData.getData().getPath(),
+                operationAndData.getData().getData(),
+                acling.getAclList(operationAndData.getData().getPath()),
+                createMode,
+                new AsyncCallback.StringCallback()
                 {
-                    trace.commit();
-
-                    if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
-                    {
-                        backgroundCreateParentsThenNode(operationAndData);
-                    }
-                    else
+                    @Override
+                    public void processResult(int rc, String path, Object ctx, String name)
                     {
-                        sendBackgroundResponse(rc, path, ctx, name, operationAndData);
+                        trace.commit();
+
+                        if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
+                        {
+                            backgroundCreateParentsThenNode(operationAndData);
+                        }
+                        else
+                        {
+                            sendBackgroundResponse(rc, path, ctx, name, operationAndData);
+                        }
                     }
-                }
-            },
-            backgrounding.getContext()
-        );
+                },
+                backgrounding.getContext()
+            );
     }
 
-    private String getProtectedPrefix()
+    private static String getProtectedPrefix(String protectedId)
     {
         return PROTECTED_PREFIX + protectedId + "-";
     }
 
     private void backgroundCreateParentsThenNode(final OperationAndData<PathAndBytes> mainOperationAndData)
     {
-        BackgroundOperation<PathAndBytes>     operation = new BackgroundOperation<PathAndBytes>()
+        BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>()
         {
             @Override
             public void performBackgroundOperation(OperationAndData<PathAndBytes> dummy) throws Exception
@@ -486,7 +513,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
                 client.queueOperation(mainOperationAndData);
             }
         };
-        OperationAndData<PathAndBytes>        parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
+        OperationAndData<PathAndBytes> parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
         client.queueOperation(parentOperation);
     }
 
@@ -558,16 +585,39 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
     private void pathInBackground(final String path, final byte[] data, final String givenPath)
     {
         final AtomicBoolean firstTime = new AtomicBoolean(true);
-        OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext())
+        OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(),
+            new OperationAndData.ErrorCallback<PathAndBytes>()
+            {
+                public void retriesExhausted(OperationAndData<PathAndBytes> operationAndData)
+                {
+                    if ( doProtected )
+                    {
+                        // all retries have failed, findProtectedNodeInForeground(..) included, schedule a clean up
+                        findAndDeleteProtectedNodeInBackground(path, protectedId, null);
+                        // assign a new id if this builder is used again later
+                        protectedId = UUID.randomUUID().toString();
+                    }
+                }
+            },
+            backgrounding.getContext())
         {
             @Override
             void callPerformBackgroundOperation() throws Exception
             {
-                boolean   callSuper = true;
-                boolean   localFirstTime = firstTime.getAndSet(false);
+                boolean callSuper = true;
+                boolean localFirstTime = firstTime.getAndSet(false);
                 if ( !localFirstTime && doProtected )
                 {
-                    String      createdPath = findProtectedNodeInForeground(path);
+                    String createdPath = null;
+                    try
+                    {
+                        createdPath = findProtectedNodeInForeground(path);
+                    }
+                    catch ( KeeperException.ConnectionLossException e )
+                    {
+                        sendBackgroundResponse(KeeperException.Code.CONNECTIONLOSS.intValue(), path, backgrounding.getContext(), null, this);
+                        callSuper = false;
+                    }
                     if ( createdPath != null )
                     {
                         try
@@ -600,117 +650,187 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
 
     private String pathInForeground(final String path, final byte[] data) throws Exception
     {
-        TimeTrace               trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");
+        TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");
 
-        final AtomicBoolean     firstTime = new AtomicBoolean(true);
-        String                  returnPath = RetryLoop.callWithRetry
-        (
-            client.getZookeeperClient(),
-            new Callable<String>()
-            {
-                @Override
-                public String call() throws Exception
+        final AtomicBoolean firstTime = new AtomicBoolean(true);
+        String returnPath = RetryLoop.callWithRetry
+            (
+                client.getZookeeperClient(),
+                new Callable<String>()
                 {
-                    boolean   localFirstTime = firstTime.getAndSet(false);
-
-                    String    createdPath = null;
-                    if ( !localFirstTime && doProtected )
+                    @Override
+                    public String call() throws Exception
                     {
-                        createdPath = findProtectedNodeInForeground(path);
-                    }
+                        boolean localFirstTime = firstTime.getAndSet(false);
 
-                    if ( createdPath == null )
-                    {
-                        try
+                        String createdPath = null;
+                        if ( !localFirstTime && doProtected )
                         {
-                            createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
+                            createdPath = findProtectedNodeInForeground(path);
                         }
-                        catch ( KeeperException.NoNodeException e )
+
+                        if ( createdPath == null )
                         {
-                            if ( createParentsIfNeeded )
+                            try
                             {
-                                ZKPaths.mkdirs(client.getZooKeeper(), path, false);
                                 createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
                             }
-                            else
+                            catch ( KeeperException.NoNodeException e )
                             {
-                                throw e;
+                                if ( createParentsIfNeeded )
+                                {
+                                    ZKPaths.mkdirs(client.getZooKeeper(), path, false);
+                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
+                                }
+                                else
+                                {
+                                    throw e;
+                                }
                             }
                         }
-                    }
 
-                    if ( failNextCreateForTesting )
-                    {
-                        failNextCreateForTesting = false;
-                        throw new KeeperException.ConnectionLossException();
+                        if ( failNextCreateForTesting )
+                        {
+                            failNextCreateForTesting = false;
+                            throw new KeeperException.ConnectionLossException();
+                        }
+                        return createdPath;
                     }
-                    return createdPath;
                 }
-            }
-        );
+            );
 
         trace.commit();
         return returnPath;
     }
 
-    private String  findProtectedNodeInForeground(final String path) throws Exception
+    private String findProtectedNodeInForeground(final String path) throws Exception
     {
-        TimeTrace       trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground");
+        TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground");
 
-        String          returnPath = RetryLoop.callWithRetry
-        (
-            client.getZookeeperClient(),
-            new Callable<String>()
-            {
-                @Override
-                public String call() throws Exception
+        String returnPath = RetryLoop.callWithRetry
+            (
+                client.getZookeeperClient(),
+                new Callable<String>()
                 {
-                    String foundNode = null;
-                    try
+                    @Override
+                    public String call() throws Exception
                     {
-                        final ZKPaths.PathAndNode   pathAndNode = ZKPaths.getPathAndNode(path);
-                        List<String>                children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
-
-                        final String                protectedPrefix = getProtectedPrefix();
-                        foundNode = Iterables.find
-                        (
-                            children,
-                            new Predicate<String>()
-                            {
-                                @Override
-                                public boolean apply(String node)
-                                {
-                                    return node.startsWith(protectedPrefix);
-                                }
-                            },
-                            null
-                        );
-                        if ( foundNode != null )
+                        String foundNode = null;
+                        try
                         {
-                            foundNode = ZKPaths.makePath(pathAndNode.getPath(), foundNode);
+                            final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+                            List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
+
+                            foundNode = findNode(children, pathAndNode.getPath(), protectedId);
                         }
+                        catch ( KeeperException.NoNodeException ignore )
+                        {
+                            // ignore
+                        }
+                        return foundNode;
                     }
-                    catch ( KeeperException.NoNodeException ignore )
-                    {
-                        // ignore
-                    }
-                    return foundNode;
                 }
-            }
-        );
+            );
 
         trace.commit();
         return returnPath;
     }
 
-    private String  adjustPath(String path) throws Exception
+    private String adjustPath(String path) throws Exception
     {
         if ( doProtected )
         {
-            ZKPaths.PathAndNode     pathAndNode = ZKPaths.getPathAndNode(path);
-            String                  name = getProtectedPrefix() + pathAndNode.getNode();
+            ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+            String name = getProtectedPrefix(protectedId) + pathAndNode.getNode();
             path = ZKPaths.makePath(pathAndNode.getPath(), name);
         }
         return path;
     }
+
+    /**
+     * Attempt to delete a protected znode
+     *
+     * @param path        the path
+     * @param protectedId the protected id
+     * @param callback    callback to use, <code>null</code> to create a new one
+     */
+    private void findAndDeleteProtectedNodeInBackground(String path, String protectedId, FindProtectedNodeCB callback)
+    {
+        if ( client.isStarted() )
+        {
+            if ( callback == null )
+            {
+                callback = new FindProtectedNodeCB(path, protectedId);
+            }
+            try
+            {
+                client.getChildren().inBackground(callback).forPath(ZKPaths.getPathAndNode(path).getPath());
+            }
+            catch ( Exception e )
+            {
+                findAndDeleteProtectedNodeInBackground(path, protectedId, callback);
+            }
+        }
+    }
+
+    private class FindProtectedNodeCB implements BackgroundCallback
+    {
+        final String path;
+        final String protectedId;
+
+        private FindProtectedNodeCB(String path, String protectedId)
+        {
+            this.path = path;
+            this.protectedId = protectedId;
+        }
+
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+        {
+            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+            {
+                final String node = findNode(event.getChildren(), ZKPaths.getPathAndNode(path).getPath(), protectedId);
+                if ( node != null )
+                {
+                    client.delete().guaranteed().inBackground().forPath(node);
+                }
+            }
+            else if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+            {
+                // retry
+                findAndDeleteProtectedNodeInBackground(path, protectedId, this);
+            }
+        }
+    }
+
+    /**
+     * Attempt to find the znode that matches the given path and protected id
+     *
+     * @param children    a list of candidates znodes
+     * @param path        the path
+     * @param protectedId the protected id
+     * @return the absolute path of the znode or <code>null</code> if it is not found
+     */
+    private static String findNode(final List<String> children, final String path, final String protectedId)
+    {
+        final String protectedPrefix = getProtectedPrefix(protectedId);
+        String foundNode = Iterables.find
+            (
+                children,
+                new Predicate<String>()
+                {
+                    @Override
+                    public boolean apply(String node)
+                    {
+                        return node.startsWith(protectedPrefix);
+                    }
+                },
+                null
+            );
+        if ( foundNode != null )
+        {
+            foundNode = ZKPaths.makePath(path, foundNode);
+        }
+        return foundNode;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9705b411/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
new file mode 100644
index 0000000..5f10c5e
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.leader;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+/**
+ * A connection factory that will behave like the NIOServerCnxnFactory except that
+ * it will unexpectedly close the connection right after the <b>first</b> znode has
+ * been created in Zookeeper.
+ * Subsequent create operations will succeed.
+ */
+public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
+{
+    public static final String CHAOS_ZNODE = "/mylock";
+    public static final String CHAOS_ZNODE_PREFIX = CHAOS_ZNODE + "/";
+
+    private static final Logger log = LoggerFactory.getLogger(ChaosMonkeyCnxnFactory.class);
+
+    /* How long after the first error, connections are rejected */
+    public static final long LOCKOUT_DURATION_MS = 6000;
+
+    public ChaosMonkeyCnxnFactory() throws IOException
+    {
+    }
+
+    @Override
+    public void startup(ZooKeeperServer zks) throws IOException, InterruptedException
+    {
+        super.startup(new ChaosMonkeyZookeeperServer(zks));
+    }
+
+    /**
+     * Build a connection with a Chaos Monkey ZookeeperServer
+     */
+    protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException
+    {
+        return new NIOServerCnxn(zkServer, sock, sk, this);
+    }
+
+    public static class ChaosMonkeyZookeeperServer extends ZooKeeperServer
+    {
+        private long firstError = 0;
+
+        public ChaosMonkeyZookeeperServer(ZooKeeperServer zks)
+        {
+            setTxnLogFactory(zks.getTxnLogFactory());
+            setTickTime(zks.getTickTime());
+            setMinSessionTimeout(zks.getMinSessionTimeout());
+            setMaxSessionTimeout(zks.getMaxSessionTimeout());
+        }
+
+        @Override
+        public void submitRequest(Request si)
+        {
+            long remaining = firstError != 0 ? LOCKOUT_DURATION_MS - (System.currentTimeMillis() - firstError) : 0;
+            if ( si.type != ZooDefs.OpCode.createSession && si.type != ZooDefs.OpCode.sync && si.type != ZooDefs.OpCode.ping
+                && firstError != 0 && remaining > 0 )
+            {
+                log.debug("Rejected : " + si.toString());
+                // Still reject request
+                log.debug("Still not ready for " + remaining + "ms");
+                ((NIOServerCnxn)si.cnxn).close();
+                return;
+            }
+            // Submit the request to the legacy Zookeeper server
+            log.debug("Applied : " + si.toString());
+            super.submitRequest(si);
+            // Raise an error if a lock is created
+            if ( si.type == ZooDefs.OpCode.create )
+            {
+                CreateRequest createRequest = new CreateRequest();
+                try
+                {
+                    ByteBuffer duplicate = si.request.duplicate();
+                    duplicate.rewind();
+                    ByteBufferInputStream.byteBuffer2Record(duplicate, createRequest);
+                    if ( createRequest.getPath().startsWith(CHAOS_ZNODE_PREFIX)
+                        && firstError == 0 )
+                    {
+                        firstError = System.currentTimeMillis();
+                        // The znode has been created, close the connection and don't tell it to client
+                        log.warn("Closing connection right after " + createRequest.getPath() + " creation");
+                        ((NIOServerCnxn)si.cnxn).close();
+                    }
+                }
+                catch ( Exception e )
+                {
+                    // Should not happen
+                    ((NIOServerCnxn)si.cnxn).close();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9705b411/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
new file mode 100644
index 0000000..e89c958
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.leader;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test cases designed after CURATOR-45
+ */
+public class TestLeaderSelectorEdges extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @BeforeClass
+    public static void setCNXFactory()
+    {
+        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, ChaosMonkeyCnxnFactory.class.getName());
+    }
+
+    @AfterClass
+    public static void resetCNXFactory()
+    {
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+    }
+
+    /**
+     * Create a LeaderSelector but close the connection right after the "lock" znode
+     * has been created.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void flappingTest() throws Exception
+    {
+        final CuratorFramework client =
+            CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryNTimes(1, 500))
+                .sessionTimeoutMs(30000)
+                .build();
+
+        final TestLeaderSelectorListener listener = new TestLeaderSelectorListener();
+        LeaderSelector leaderSelector1 =
+            new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE, listener);
+        LeaderSelector leaderSelector2 = null;
+
+        client.start();
+        try
+        {
+            client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+            leaderSelector1.start();
+            // At this point the ChaosMonkeyZookeeperServer must close the connection
+            // right after the lock znode is created.
+            Assert.assertTrue(listener.reconnected.await(10, TimeUnit.SECONDS), "Connection has not been lost");
+            // Check that leader ship has failed
+            Assert.assertEquals(listener.takeLeadership.getCount(), 1);
+            // Wait FailedDelete
+            Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+            // Check that there is no znode
+            final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+            Assert.assertEquals(children, 0,
+                "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+            // Check that a new LeaderSelector can be started
+            leaderSelector2 = new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE,
+                listener);
+            leaderSelector2.start();
+            Assert.assertTrue(listener.takeLeadership.await(1, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            try
+            {
+                leaderSelector1.close();
+            }
+            catch ( IllegalStateException e )
+            {
+                Assert.fail(e.getMessage());
+            }
+            try
+            {
+                if ( leaderSelector2 != null )
+                {
+                    leaderSelector2.close();
+                }
+            }
+            catch ( IllegalStateException e )
+            {
+                Assert.fail(e.getMessage());
+            }
+            client.close();
+        }
+    }
+
+    private class TestLeaderSelectorListener implements LeaderSelectorListener
+    {
+        final CountDownLatch takeLeadership = new CountDownLatch(1);
+        final CountDownLatch reconnected = new CountDownLatch(1);
+
+        @Override
+        public void takeLeadership(CuratorFramework client) throws Exception
+        {
+            log.info("-->takeLeadership({})", client.toString());
+            takeLeadership.countDown();
+            log.info("<--takeLeadership({})", client.toString());
+        }
+
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( newState == ConnectionState.RECONNECTED )
+            {
+                reconnected.countDown();
+            }
+        }
+
+    }
+
+    /**
+     * Create a protected node in background with a retry policy
+     */
+    @Test
+    public void createProtectedNodeInBackgroundTest() throws Exception
+    {
+        final CuratorFramework client =
+            CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryNTimes(2, 1))
+                .connectionTimeoutMs(100)
+                .sessionTimeoutMs(60000)
+                .build();
+        final CountDownLatch latch = new CountDownLatch(1);
+        client.start();
+        try
+        {
+            client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+            client.create()
+                .withProtection()
+                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+                .inBackground(
+                    new BackgroundCallback()
+                    {
+                        public void processResult(CuratorFramework client, CuratorEvent event)
+                            throws Exception
+                        {
+                            log.info("Receive event {}", event.toString());
+                            if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+                            {
+                                latch.countDown();
+                            }
+                        }
+                    }
+                             )
+                .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
+
+            Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
+            // Wait for the znode to be deleted
+            Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+            // Check that there is no znode
+            final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+            Assert.assertEquals(children, 0,
+                "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+        }
+        finally
+        {
+            client.close();
+        }
+
+    }
+
+    /**
+     * Same test as above but without a retry policy
+     */
+    @Test
+    public void createProtectedNodeInBackgroundTestNoRetry() throws Exception
+    {
+        final CuratorFramework client =
+            CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryNTimes(0, 0))
+                .connectionTimeoutMs(100)
+                .sessionTimeoutMs(60000)
+                .build();
+        final CountDownLatch latch = new CountDownLatch(1);
+        client.start();
+        try
+        {
+            client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+            client.create()
+                .withProtection()
+                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+                .inBackground(
+                    new BackgroundCallback()
+                    {
+                        public void processResult(CuratorFramework client, CuratorEvent event)
+                            throws Exception
+                        {
+                            log.info("Receive event {}", event.toString());
+                            if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+                            {
+                                latch.countDown();
+                            }
+                        }
+                    }
+                             )
+                .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
+
+            Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
+            // Wait for the znode to be deleted
+            Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+            // Check that there is no znode
+            final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+            Assert.assertEquals(children, 0,
+                "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+        }
+        finally
+        {
+            client.close();
+        }
+
+    }
+}