You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/09/25 18:09:38 UTC

[1/3] git commit: Work around edge case with protected mode. Make sure created node gets deleted if retries fail during connection loss

Updated Branches:
  refs/heads/master 10eb1efe5 -> 9460a467e


Work around edge case with protected mode. Make sure created node gets deleted if retries fail during connection loss


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/9705b411
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/9705b411
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/9705b411

Branch: refs/heads/master
Commit: 9705b4114433b5776ea40c810fbdec1f8151e17d
Parents: 10eb1ef
Author: randgalt <ra...@apache.org>
Authored: Wed Sep 25 09:01:00 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Wed Sep 25 09:01:00 2013 -0700

----------------------------------------------------------------------
 .../framework/imps/CreateBuilderImpl.java       | 352 +++++++++++++------
 .../recipes/leader/ChaosMonkeyCnxnFactory.java  | 124 +++++++
 .../recipes/leader/TestLeaderSelectorEdges.java | 255 ++++++++++++++
 3 files changed, 615 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9705b411/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index ee99074..ebd342f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,6 +24,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.*;
 import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
 import org.apache.curator.framework.api.transaction.OperationType;
@@ -41,20 +43,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>
 {
-    private final CuratorFrameworkImpl      client;
-    private CreateMode                      createMode;
-    private Backgrounding                   backgrounding;
-    private boolean                         createParentsIfNeeded;
-    private boolean                         doProtected;
-    private boolean                         compress;
-    private String                          protectedId;
-    private ACLing                          acling;
+    private final CuratorFrameworkImpl client;
+    private CreateMode createMode;
+    private Backgrounding backgrounding;
+    private boolean createParentsIfNeeded;
+    private boolean doProtected;
+    private boolean compress;
+    private String protectedId;
+    private ACLing acling;
 
     @VisibleForTesting
     boolean failNextCreateForTesting = false;
 
     @VisibleForTesting
-    static final String         PROTECTED_PREFIX = "_c_";
+    static final String PROTECTED_PREFIX = "_c_";
 
     CreateBuilderImpl(CuratorFrameworkImpl client)
     {
@@ -68,7 +70,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
         protectedId = null;
     }
 
-    TransactionCreateBuilder        asTransactionCreateBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
+    TransactionCreateBuilder asTransactionCreateBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
     {
         return new TransactionCreateBuilder()
         {
@@ -95,7 +97,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
             @Override
             public CuratorTransactionBridge forPath(String path, byte[] data) throws Exception
             {
-                String      fixedPath = client.fixForNamespace(path);
+                String fixedPath = client.fixForNamespace(path);
                 transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode), OperationType.CREATE, path);
                 return curatorTransaction;
             }
@@ -417,60 +419,85 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
             data = client.getCompressionProvider().compress(givenPath, data);
         }
 
-        final String    adjustedPath = adjustPath(client.fixForNamespace(givenPath));
+        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath));
 
-        String  returnPath = null;
+        String returnPath = null;
         if ( backgrounding.inBackground() )
         {
             pathInBackground(adjustedPath, data, givenPath);
         }
         else
         {
-            returnPath = pathInForeground(adjustedPath, data);
-            returnPath = client.unfixForNamespace(returnPath);
+            String path = protectedPathInForeground(adjustedPath, data);
+            returnPath = client.unfixForNamespace(path);
         }
         return returnPath;
     }
 
+    private String protectedPathInForeground(String adjustedPath, byte[] data) throws Exception
+    {
+        try
+        {
+            return pathInForeground(adjustedPath, data);
+        }
+        catch ( KeeperException.ConnectionLossException e )
+        {
+            if ( protectedId != null )
+            {
+                /*
+                 * CURATOR-45 : we don't know if the create operation was successful or not,
+                 * register the znode to be sure it is deleted later.
+                 */
+                findAndDeleteProtectedNodeInBackground(adjustedPath, protectedId, null);
+                /*
+                * The current UUID is scheduled to be deleted, it is not safe to use it again.
+                * If this builder is used again later create a new UUID
+                */
+                protectedId = UUID.randomUUID().toString();
+            }
+            throw e;
+        }
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
     {
-        final TimeTrace   trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background");
+        final TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background");
         client.getZooKeeper().create
-        (
-            operationAndData.getData().getPath(),
-            operationAndData.getData().getData(),
-            acling.getAclList(operationAndData.getData().getPath()),
-            createMode,
-            new AsyncCallback.StringCallback()
-            {
-                @Override
-                public void processResult(int rc, String path, Object ctx, String name)
+            (
+                operationAndData.getData().getPath(),
+                operationAndData.getData().getData(),
+                acling.getAclList(operationAndData.getData().getPath()),
+                createMode,
+                new AsyncCallback.StringCallback()
                 {
-                    trace.commit();
-
-                    if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
-                    {
-                        backgroundCreateParentsThenNode(operationAndData);
-                    }
-                    else
+                    @Override
+                    public void processResult(int rc, String path, Object ctx, String name)
                     {
-                        sendBackgroundResponse(rc, path, ctx, name, operationAndData);
+                        trace.commit();
+
+                        if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
+                        {
+                            backgroundCreateParentsThenNode(operationAndData);
+                        }
+                        else
+                        {
+                            sendBackgroundResponse(rc, path, ctx, name, operationAndData);
+                        }
                     }
-                }
-            },
-            backgrounding.getContext()
-        );
+                },
+                backgrounding.getContext()
+            );
     }
 
-    private String getProtectedPrefix()
+    private static String getProtectedPrefix(String protectedId)
     {
         return PROTECTED_PREFIX + protectedId + "-";
     }
 
     private void backgroundCreateParentsThenNode(final OperationAndData<PathAndBytes> mainOperationAndData)
     {
-        BackgroundOperation<PathAndBytes>     operation = new BackgroundOperation<PathAndBytes>()
+        BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>()
         {
             @Override
             public void performBackgroundOperation(OperationAndData<PathAndBytes> dummy) throws Exception
@@ -486,7 +513,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
                 client.queueOperation(mainOperationAndData);
             }
         };
-        OperationAndData<PathAndBytes>        parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
+        OperationAndData<PathAndBytes> parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
         client.queueOperation(parentOperation);
     }
 
@@ -558,16 +585,39 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
     private void pathInBackground(final String path, final byte[] data, final String givenPath)
     {
         final AtomicBoolean firstTime = new AtomicBoolean(true);
-        OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext())
+        OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(),
+            new OperationAndData.ErrorCallback<PathAndBytes>()
+            {
+                public void retriesExhausted(OperationAndData<PathAndBytes> operationAndData)
+                {
+                    if ( doProtected )
+                    {
+                        // all retries have failed, findProtectedNodeInForeground(..) included, schedule a clean up
+                        findAndDeleteProtectedNodeInBackground(path, protectedId, null);
+                        // assign a new id if this builder is used again later
+                        protectedId = UUID.randomUUID().toString();
+                    }
+                }
+            },
+            backgrounding.getContext())
         {
             @Override
             void callPerformBackgroundOperation() throws Exception
             {
-                boolean   callSuper = true;
-                boolean   localFirstTime = firstTime.getAndSet(false);
+                boolean callSuper = true;
+                boolean localFirstTime = firstTime.getAndSet(false);
                 if ( !localFirstTime && doProtected )
                 {
-                    String      createdPath = findProtectedNodeInForeground(path);
+                    String createdPath = null;
+                    try
+                    {
+                        createdPath = findProtectedNodeInForeground(path);
+                    }
+                    catch ( KeeperException.ConnectionLossException e )
+                    {
+                        sendBackgroundResponse(KeeperException.Code.CONNECTIONLOSS.intValue(), path, backgrounding.getContext(), null, this);
+                        callSuper = false;
+                    }
                     if ( createdPath != null )
                     {
                         try
@@ -600,117 +650,187 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
 
     private String pathInForeground(final String path, final byte[] data) throws Exception
     {
-        TimeTrace               trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");
+        TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");
 
-        final AtomicBoolean     firstTime = new AtomicBoolean(true);
-        String                  returnPath = RetryLoop.callWithRetry
-        (
-            client.getZookeeperClient(),
-            new Callable<String>()
-            {
-                @Override
-                public String call() throws Exception
+        final AtomicBoolean firstTime = new AtomicBoolean(true);
+        String returnPath = RetryLoop.callWithRetry
+            (
+                client.getZookeeperClient(),
+                new Callable<String>()
                 {
-                    boolean   localFirstTime = firstTime.getAndSet(false);
-
-                    String    createdPath = null;
-                    if ( !localFirstTime && doProtected )
+                    @Override
+                    public String call() throws Exception
                     {
-                        createdPath = findProtectedNodeInForeground(path);
-                    }
+                        boolean localFirstTime = firstTime.getAndSet(false);
 
-                    if ( createdPath == null )
-                    {
-                        try
+                        String createdPath = null;
+                        if ( !localFirstTime && doProtected )
                         {
-                            createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
+                            createdPath = findProtectedNodeInForeground(path);
                         }
-                        catch ( KeeperException.NoNodeException e )
+
+                        if ( createdPath == null )
                         {
-                            if ( createParentsIfNeeded )
+                            try
                             {
-                                ZKPaths.mkdirs(client.getZooKeeper(), path, false);
                                 createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
                             }
-                            else
+                            catch ( KeeperException.NoNodeException e )
                             {
-                                throw e;
+                                if ( createParentsIfNeeded )
+                                {
+                                    ZKPaths.mkdirs(client.getZooKeeper(), path, false);
+                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
+                                }
+                                else
+                                {
+                                    throw e;
+                                }
                             }
                         }
-                    }
 
-                    if ( failNextCreateForTesting )
-                    {
-                        failNextCreateForTesting = false;
-                        throw new KeeperException.ConnectionLossException();
+                        if ( failNextCreateForTesting )
+                        {
+                            failNextCreateForTesting = false;
+                            throw new KeeperException.ConnectionLossException();
+                        }
+                        return createdPath;
                     }
-                    return createdPath;
                 }
-            }
-        );
+            );
 
         trace.commit();
         return returnPath;
     }
 
-    private String  findProtectedNodeInForeground(final String path) throws Exception
+    private String findProtectedNodeInForeground(final String path) throws Exception
     {
-        TimeTrace       trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground");
+        TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground");
 
-        String          returnPath = RetryLoop.callWithRetry
-        (
-            client.getZookeeperClient(),
-            new Callable<String>()
-            {
-                @Override
-                public String call() throws Exception
+        String returnPath = RetryLoop.callWithRetry
+            (
+                client.getZookeeperClient(),
+                new Callable<String>()
                 {
-                    String foundNode = null;
-                    try
+                    @Override
+                    public String call() throws Exception
                     {
-                        final ZKPaths.PathAndNode   pathAndNode = ZKPaths.getPathAndNode(path);
-                        List<String>                children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
-
-                        final String                protectedPrefix = getProtectedPrefix();
-                        foundNode = Iterables.find
-                        (
-                            children,
-                            new Predicate<String>()
-                            {
-                                @Override
-                                public boolean apply(String node)
-                                {
-                                    return node.startsWith(protectedPrefix);
-                                }
-                            },
-                            null
-                        );
-                        if ( foundNode != null )
+                        String foundNode = null;
+                        try
                         {
-                            foundNode = ZKPaths.makePath(pathAndNode.getPath(), foundNode);
+                            final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+                            List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
+
+                            foundNode = findNode(children, pathAndNode.getPath(), protectedId);
                         }
+                        catch ( KeeperException.NoNodeException ignore )
+                        {
+                            // ignore
+                        }
+                        return foundNode;
                     }
-                    catch ( KeeperException.NoNodeException ignore )
-                    {
-                        // ignore
-                    }
-                    return foundNode;
                 }
-            }
-        );
+            );
 
         trace.commit();
         return returnPath;
     }
 
-    private String  adjustPath(String path) throws Exception
+    private String adjustPath(String path) throws Exception
     {
         if ( doProtected )
         {
-            ZKPaths.PathAndNode     pathAndNode = ZKPaths.getPathAndNode(path);
-            String                  name = getProtectedPrefix() + pathAndNode.getNode();
+            ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+            String name = getProtectedPrefix(protectedId) + pathAndNode.getNode();
             path = ZKPaths.makePath(pathAndNode.getPath(), name);
         }
         return path;
     }
+
+    /**
+     * Attempt to delete a protected znode
+     *
+     * @param path        the path
+     * @param protectedId the protected id
+     * @param callback    callback to use, <code>null</code> to create a new one
+     */
+    private void findAndDeleteProtectedNodeInBackground(String path, String protectedId, FindProtectedNodeCB callback)
+    {
+        if ( client.isStarted() )
+        {
+            if ( callback == null )
+            {
+                callback = new FindProtectedNodeCB(path, protectedId);
+            }
+            try
+            {
+                client.getChildren().inBackground(callback).forPath(ZKPaths.getPathAndNode(path).getPath());
+            }
+            catch ( Exception e )
+            {
+                findAndDeleteProtectedNodeInBackground(path, protectedId, callback);
+            }
+        }
+    }
+
+    private class FindProtectedNodeCB implements BackgroundCallback
+    {
+        final String path;
+        final String protectedId;
+
+        private FindProtectedNodeCB(String path, String protectedId)
+        {
+            this.path = path;
+            this.protectedId = protectedId;
+        }
+
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+        {
+            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+            {
+                final String node = findNode(event.getChildren(), ZKPaths.getPathAndNode(path).getPath(), protectedId);
+                if ( node != null )
+                {
+                    client.delete().guaranteed().inBackground().forPath(node);
+                }
+            }
+            else if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+            {
+                // retry
+                findAndDeleteProtectedNodeInBackground(path, protectedId, this);
+            }
+        }
+    }
+
+    /**
+     * Attempt to find the znode that matches the given path and protected id
+     *
+     * @param children    a list of candidates znodes
+     * @param path        the path
+     * @param protectedId the protected id
+     * @return the absolute path of the znode or <code>null</code> if it is not found
+     */
+    private static String findNode(final List<String> children, final String path, final String protectedId)
+    {
+        final String protectedPrefix = getProtectedPrefix(protectedId);
+        String foundNode = Iterables.find
+            (
+                children,
+                new Predicate<String>()
+                {
+                    @Override
+                    public boolean apply(String node)
+                    {
+                        return node.startsWith(protectedPrefix);
+                    }
+                },
+                null
+            );
+        if ( foundNode != null )
+        {
+            foundNode = ZKPaths.makePath(path, foundNode);
+        }
+        return foundNode;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9705b411/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
new file mode 100644
index 0000000..5f10c5e
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.leader;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+/**
+ * A connection factory that will behave like the NIOServerCnxnFactory except that
+ * it will unexpectedly close the connection right after the <b>first</b> znode has
+ * been created in Zookeeper.
+ * Subsequent create operations will succeed.
+ */
+public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
+{
+    public static final String CHAOS_ZNODE = "/mylock";
+    public static final String CHAOS_ZNODE_PREFIX = CHAOS_ZNODE + "/";
+
+    private static final Logger log = LoggerFactory.getLogger(ChaosMonkeyCnxnFactory.class);
+
+    /* How long after the first error, connections are rejected */
+    public static final long LOCKOUT_DURATION_MS = 6000;
+
+    public ChaosMonkeyCnxnFactory() throws IOException
+    {
+    }
+
+    @Override
+    public void startup(ZooKeeperServer zks) throws IOException, InterruptedException
+    {
+        super.startup(new ChaosMonkeyZookeeperServer(zks));
+    }
+
+    /**
+     * Build a connection with a Chaos Monkey ZookeeperServer
+     */
+    protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException
+    {
+        return new NIOServerCnxn(zkServer, sock, sk, this);
+    }
+
+    public static class ChaosMonkeyZookeeperServer extends ZooKeeperServer
+    {
+        private long firstError = 0;
+
+        public ChaosMonkeyZookeeperServer(ZooKeeperServer zks)
+        {
+            setTxnLogFactory(zks.getTxnLogFactory());
+            setTickTime(zks.getTickTime());
+            setMinSessionTimeout(zks.getMinSessionTimeout());
+            setMaxSessionTimeout(zks.getMaxSessionTimeout());
+        }
+
+        @Override
+        public void submitRequest(Request si)
+        {
+            long remaining = firstError != 0 ? LOCKOUT_DURATION_MS - (System.currentTimeMillis() - firstError) : 0;
+            if ( si.type != ZooDefs.OpCode.createSession && si.type != ZooDefs.OpCode.sync && si.type != ZooDefs.OpCode.ping
+                && firstError != 0 && remaining > 0 )
+            {
+                log.debug("Rejected : " + si.toString());
+                // Still reject request
+                log.debug("Still not ready for " + remaining + "ms");
+                ((NIOServerCnxn)si.cnxn).close();
+                return;
+            }
+            // Submit the request to the legacy Zookeeper server
+            log.debug("Applied : " + si.toString());
+            super.submitRequest(si);
+            // Raise an error if a lock is created
+            if ( si.type == ZooDefs.OpCode.create )
+            {
+                CreateRequest createRequest = new CreateRequest();
+                try
+                {
+                    ByteBuffer duplicate = si.request.duplicate();
+                    duplicate.rewind();
+                    ByteBufferInputStream.byteBuffer2Record(duplicate, createRequest);
+                    if ( createRequest.getPath().startsWith(CHAOS_ZNODE_PREFIX)
+                        && firstError == 0 )
+                    {
+                        firstError = System.currentTimeMillis();
+                        // The znode has been created, close the connection and don't tell it to client
+                        log.warn("Closing connection right after " + createRequest.getPath() + " creation");
+                        ((NIOServerCnxn)si.cnxn).close();
+                    }
+                }
+                catch ( Exception e )
+                {
+                    // Should not happen
+                    ((NIOServerCnxn)si.cnxn).close();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9705b411/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
new file mode 100644
index 0000000..e89c958
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.leader;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test cases designed after CURATOR-45
+ */
+public class TestLeaderSelectorEdges extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @BeforeClass
+    public static void setCNXFactory()
+    {
+        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, ChaosMonkeyCnxnFactory.class.getName());
+    }
+
+    @AfterClass
+    public static void resetCNXFactory()
+    {
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+    }
+
+    /**
+     * Create a LeaderSelector but close the connection right after the "lock" znode
+     * has been created.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void flappingTest() throws Exception
+    {
+        final CuratorFramework client =
+            CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryNTimes(1, 500))
+                .sessionTimeoutMs(30000)
+                .build();
+
+        final TestLeaderSelectorListener listener = new TestLeaderSelectorListener();
+        LeaderSelector leaderSelector1 =
+            new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE, listener);
+        LeaderSelector leaderSelector2 = null;
+
+        client.start();
+        try
+        {
+            client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+            leaderSelector1.start();
+            // At this point the ChaosMonkeyZookeeperServer must close the connection
+            // right after the lock znode is created.
+            Assert.assertTrue(listener.reconnected.await(10, TimeUnit.SECONDS), "Connection has not been lost");
+            // Check that leader ship has failed
+            Assert.assertEquals(listener.takeLeadership.getCount(), 1);
+            // Wait FailedDelete
+            Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+            // Check that there is no znode
+            final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+            Assert.assertEquals(children, 0,
+                "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+            // Check that a new LeaderSelector can be started
+            leaderSelector2 = new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE,
+                listener);
+            leaderSelector2.start();
+            Assert.assertTrue(listener.takeLeadership.await(1, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            try
+            {
+                leaderSelector1.close();
+            }
+            catch ( IllegalStateException e )
+            {
+                Assert.fail(e.getMessage());
+            }
+            try
+            {
+                if ( leaderSelector2 != null )
+                {
+                    leaderSelector2.close();
+                }
+            }
+            catch ( IllegalStateException e )
+            {
+                Assert.fail(e.getMessage());
+            }
+            client.close();
+        }
+    }
+
+    private class TestLeaderSelectorListener implements LeaderSelectorListener
+    {
+        final CountDownLatch takeLeadership = new CountDownLatch(1);
+        final CountDownLatch reconnected = new CountDownLatch(1);
+
+        @Override
+        public void takeLeadership(CuratorFramework client) throws Exception
+        {
+            log.info("-->takeLeadership({})", client.toString());
+            takeLeadership.countDown();
+            log.info("<--takeLeadership({})", client.toString());
+        }
+
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( newState == ConnectionState.RECONNECTED )
+            {
+                reconnected.countDown();
+            }
+        }
+
+    }
+
+    /**
+     * Create a protected node in background with a retry policy
+     */
+    @Test
+    public void createProtectedNodeInBackgroundTest() throws Exception
+    {
+        final CuratorFramework client =
+            CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryNTimes(2, 1))
+                .connectionTimeoutMs(100)
+                .sessionTimeoutMs(60000)
+                .build();
+        final CountDownLatch latch = new CountDownLatch(1);
+        client.start();
+        try
+        {
+            client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+            client.create()
+                .withProtection()
+                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+                .inBackground(
+                    new BackgroundCallback()
+                    {
+                        public void processResult(CuratorFramework client, CuratorEvent event)
+                            throws Exception
+                        {
+                            log.info("Receive event {}", event.toString());
+                            if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+                            {
+                                latch.countDown();
+                            }
+                        }
+                    }
+                             )
+                .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
+
+            Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
+            // Wait for the znode to be deleted
+            Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+            // Check that there is no znode
+            final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+            Assert.assertEquals(children, 0,
+                "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+        }
+        finally
+        {
+            client.close();
+        }
+
+    }
+
+    /**
+     * Same test as above but without a retry policy
+     */
+    @Test
+    public void createProtectedNodeInBackgroundTestNoRetry() throws Exception
+    {
+        final CuratorFramework client =
+            CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryNTimes(0, 0))
+                .connectionTimeoutMs(100)
+                .sessionTimeoutMs(60000)
+                .build();
+        final CountDownLatch latch = new CountDownLatch(1);
+        client.start();
+        try
+        {
+            client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+            client.create()
+                .withProtection()
+                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+                .inBackground(
+                    new BackgroundCallback()
+                    {
+                        public void processResult(CuratorFramework client, CuratorEvent event)
+                            throws Exception
+                        {
+                            log.info("Receive event {}", event.toString());
+                            if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+                            {
+                                latch.countDown();
+                            }
+                        }
+                    }
+                             )
+                .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
+
+            Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
+            // Wait for the znode to be deleted
+            Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+            // Check that there is no znode
+            final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+            Assert.assertEquals(children, 0,
+                "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+        }
+        finally
+        {
+            client.close();
+        }
+
+    }
+}


[2/3] git commit: Work around edge case with protected mode. Make sure created node gets deleted if retries fail during connection loss: see CURATOR-45. The issue is that retries fail due to connection timeout but the node was actually created on the ser

Posted by ra...@apache.org.
Work around edge case with protected mode. Make sure created node gets deleted if retries fail during connection loss: see CURATOR-45.
The issue is that retries fail due to connection timeout but the node was actually created on the server. The client will think the
create failed, but the node still exists. So, this change attempts to delete the node once the connection is re-established which then aligns
the server with what the client believes is true.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/4b166a09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/4b166a09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/4b166a09

Branch: refs/heads/master
Commit: 4b166a09eeb054671ed127b20703476b3103b5d1
Parents: 10eb1ef
Author: randgalt <ra...@apache.org>
Authored: Wed Sep 25 09:01:00 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Wed Sep 25 09:06:24 2013 -0700

----------------------------------------------------------------------
 .../framework/imps/CreateBuilderImpl.java       | 352 +++++++++++++------
 .../recipes/leader/ChaosMonkeyCnxnFactory.java  | 124 +++++++
 .../recipes/leader/TestLeaderSelectorEdges.java | 255 ++++++++++++++
 3 files changed, 615 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/4b166a09/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index ee99074..ebd342f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,6 +24,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.*;
 import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
 import org.apache.curator.framework.api.transaction.OperationType;
@@ -41,20 +43,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>
 {
-    private final CuratorFrameworkImpl      client;
-    private CreateMode                      createMode;
-    private Backgrounding                   backgrounding;
-    private boolean                         createParentsIfNeeded;
-    private boolean                         doProtected;
-    private boolean                         compress;
-    private String                          protectedId;
-    private ACLing                          acling;
+    private final CuratorFrameworkImpl client;
+    private CreateMode createMode;
+    private Backgrounding backgrounding;
+    private boolean createParentsIfNeeded;
+    private boolean doProtected;
+    private boolean compress;
+    private String protectedId;
+    private ACLing acling;
 
     @VisibleForTesting
     boolean failNextCreateForTesting = false;
 
     @VisibleForTesting
-    static final String         PROTECTED_PREFIX = "_c_";
+    static final String PROTECTED_PREFIX = "_c_";
 
     CreateBuilderImpl(CuratorFrameworkImpl client)
     {
@@ -68,7 +70,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
         protectedId = null;
     }
 
-    TransactionCreateBuilder        asTransactionCreateBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
+    TransactionCreateBuilder asTransactionCreateBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
     {
         return new TransactionCreateBuilder()
         {
@@ -95,7 +97,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
             @Override
             public CuratorTransactionBridge forPath(String path, byte[] data) throws Exception
             {
-                String      fixedPath = client.fixForNamespace(path);
+                String fixedPath = client.fixForNamespace(path);
                 transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode), OperationType.CREATE, path);
                 return curatorTransaction;
             }
@@ -417,60 +419,85 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
             data = client.getCompressionProvider().compress(givenPath, data);
         }
 
-        final String    adjustedPath = adjustPath(client.fixForNamespace(givenPath));
+        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath));
 
-        String  returnPath = null;
+        String returnPath = null;
         if ( backgrounding.inBackground() )
         {
             pathInBackground(adjustedPath, data, givenPath);
         }
         else
         {
-            returnPath = pathInForeground(adjustedPath, data);
-            returnPath = client.unfixForNamespace(returnPath);
+            String path = protectedPathInForeground(adjustedPath, data);
+            returnPath = client.unfixForNamespace(path);
         }
         return returnPath;
     }
 
+    private String protectedPathInForeground(String adjustedPath, byte[] data) throws Exception
+    {
+        try
+        {
+            return pathInForeground(adjustedPath, data);
+        }
+        catch ( KeeperException.ConnectionLossException e )
+        {
+            if ( protectedId != null )
+            {
+                /*
+                 * CURATOR-45 : we don't know if the create operation was successful or not,
+                 * register the znode to be sure it is deleted later.
+                 */
+                findAndDeleteProtectedNodeInBackground(adjustedPath, protectedId, null);
+                /*
+                * The current UUID is scheduled to be deleted, it is not safe to use it again.
+                * If this builder is used again later create a new UUID
+                */
+                protectedId = UUID.randomUUID().toString();
+            }
+            throw e;
+        }
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
     {
-        final TimeTrace   trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background");
+        final TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Background");
         client.getZooKeeper().create
-        (
-            operationAndData.getData().getPath(),
-            operationAndData.getData().getData(),
-            acling.getAclList(operationAndData.getData().getPath()),
-            createMode,
-            new AsyncCallback.StringCallback()
-            {
-                @Override
-                public void processResult(int rc, String path, Object ctx, String name)
+            (
+                operationAndData.getData().getPath(),
+                operationAndData.getData().getData(),
+                acling.getAclList(operationAndData.getData().getPath()),
+                createMode,
+                new AsyncCallback.StringCallback()
                 {
-                    trace.commit();
-
-                    if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
-                    {
-                        backgroundCreateParentsThenNode(operationAndData);
-                    }
-                    else
+                    @Override
+                    public void processResult(int rc, String path, Object ctx, String name)
                     {
-                        sendBackgroundResponse(rc, path, ctx, name, operationAndData);
+                        trace.commit();
+
+                        if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
+                        {
+                            backgroundCreateParentsThenNode(operationAndData);
+                        }
+                        else
+                        {
+                            sendBackgroundResponse(rc, path, ctx, name, operationAndData);
+                        }
                     }
-                }
-            },
-            backgrounding.getContext()
-        );
+                },
+                backgrounding.getContext()
+            );
     }
 
-    private String getProtectedPrefix()
+    private static String getProtectedPrefix(String protectedId)
     {
         return PROTECTED_PREFIX + protectedId + "-";
     }
 
     private void backgroundCreateParentsThenNode(final OperationAndData<PathAndBytes> mainOperationAndData)
     {
-        BackgroundOperation<PathAndBytes>     operation = new BackgroundOperation<PathAndBytes>()
+        BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>()
         {
             @Override
             public void performBackgroundOperation(OperationAndData<PathAndBytes> dummy) throws Exception
@@ -486,7 +513,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
                 client.queueOperation(mainOperationAndData);
             }
         };
-        OperationAndData<PathAndBytes>        parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
+        OperationAndData<PathAndBytes> parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
         client.queueOperation(parentOperation);
     }
 
@@ -558,16 +585,39 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
     private void pathInBackground(final String path, final byte[] data, final String givenPath)
     {
         final AtomicBoolean firstTime = new AtomicBoolean(true);
-        OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext())
+        OperationAndData<PathAndBytes> operationAndData = new OperationAndData<PathAndBytes>(this, new PathAndBytes(path, data), backgrounding.getCallback(),
+            new OperationAndData.ErrorCallback<PathAndBytes>()
+            {
+                public void retriesExhausted(OperationAndData<PathAndBytes> operationAndData)
+                {
+                    if ( doProtected )
+                    {
+                        // all retries have failed, findProtectedNodeInForeground(..) included, schedule a clean up
+                        findAndDeleteProtectedNodeInBackground(path, protectedId, null);
+                        // assign a new id if this builder is used again later
+                        protectedId = UUID.randomUUID().toString();
+                    }
+                }
+            },
+            backgrounding.getContext())
         {
             @Override
             void callPerformBackgroundOperation() throws Exception
             {
-                boolean   callSuper = true;
-                boolean   localFirstTime = firstTime.getAndSet(false);
+                boolean callSuper = true;
+                boolean localFirstTime = firstTime.getAndSet(false);
                 if ( !localFirstTime && doProtected )
                 {
-                    String      createdPath = findProtectedNodeInForeground(path);
+                    String createdPath = null;
+                    try
+                    {
+                        createdPath = findProtectedNodeInForeground(path);
+                    }
+                    catch ( KeeperException.ConnectionLossException e )
+                    {
+                        sendBackgroundResponse(KeeperException.Code.CONNECTIONLOSS.intValue(), path, backgrounding.getContext(), null, this);
+                        callSuper = false;
+                    }
                     if ( createdPath != null )
                     {
                         try
@@ -600,117 +650,187 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
 
     private String pathInForeground(final String path, final byte[] data) throws Exception
     {
-        TimeTrace               trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");
+        TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-Foreground");
 
-        final AtomicBoolean     firstTime = new AtomicBoolean(true);
-        String                  returnPath = RetryLoop.callWithRetry
-        (
-            client.getZookeeperClient(),
-            new Callable<String>()
-            {
-                @Override
-                public String call() throws Exception
+        final AtomicBoolean firstTime = new AtomicBoolean(true);
+        String returnPath = RetryLoop.callWithRetry
+            (
+                client.getZookeeperClient(),
+                new Callable<String>()
                 {
-                    boolean   localFirstTime = firstTime.getAndSet(false);
-
-                    String    createdPath = null;
-                    if ( !localFirstTime && doProtected )
+                    @Override
+                    public String call() throws Exception
                     {
-                        createdPath = findProtectedNodeInForeground(path);
-                    }
+                        boolean localFirstTime = firstTime.getAndSet(false);
 
-                    if ( createdPath == null )
-                    {
-                        try
+                        String createdPath = null;
+                        if ( !localFirstTime && doProtected )
                         {
-                            createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
+                            createdPath = findProtectedNodeInForeground(path);
                         }
-                        catch ( KeeperException.NoNodeException e )
+
+                        if ( createdPath == null )
                         {
-                            if ( createParentsIfNeeded )
+                            try
                             {
-                                ZKPaths.mkdirs(client.getZooKeeper(), path, false);
                                 createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
                             }
-                            else
+                            catch ( KeeperException.NoNodeException e )
                             {
-                                throw e;
+                                if ( createParentsIfNeeded )
+                                {
+                                    ZKPaths.mkdirs(client.getZooKeeper(), path, false);
+                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
+                                }
+                                else
+                                {
+                                    throw e;
+                                }
                             }
                         }
-                    }
 
-                    if ( failNextCreateForTesting )
-                    {
-                        failNextCreateForTesting = false;
-                        throw new KeeperException.ConnectionLossException();
+                        if ( failNextCreateForTesting )
+                        {
+                            failNextCreateForTesting = false;
+                            throw new KeeperException.ConnectionLossException();
+                        }
+                        return createdPath;
                     }
-                    return createdPath;
                 }
-            }
-        );
+            );
 
         trace.commit();
         return returnPath;
     }
 
-    private String  findProtectedNodeInForeground(final String path) throws Exception
+    private String findProtectedNodeInForeground(final String path) throws Exception
     {
-        TimeTrace       trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground");
+        TimeTrace trace = client.getZookeeperClient().startTracer("CreateBuilderImpl-findProtectedNodeInForeground");
 
-        String          returnPath = RetryLoop.callWithRetry
-        (
-            client.getZookeeperClient(),
-            new Callable<String>()
-            {
-                @Override
-                public String call() throws Exception
+        String returnPath = RetryLoop.callWithRetry
+            (
+                client.getZookeeperClient(),
+                new Callable<String>()
                 {
-                    String foundNode = null;
-                    try
+                    @Override
+                    public String call() throws Exception
                     {
-                        final ZKPaths.PathAndNode   pathAndNode = ZKPaths.getPathAndNode(path);
-                        List<String>                children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
-
-                        final String                protectedPrefix = getProtectedPrefix();
-                        foundNode = Iterables.find
-                        (
-                            children,
-                            new Predicate<String>()
-                            {
-                                @Override
-                                public boolean apply(String node)
-                                {
-                                    return node.startsWith(protectedPrefix);
-                                }
-                            },
-                            null
-                        );
-                        if ( foundNode != null )
+                        String foundNode = null;
+                        try
                         {
-                            foundNode = ZKPaths.makePath(pathAndNode.getPath(), foundNode);
+                            final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+                            List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
+
+                            foundNode = findNode(children, pathAndNode.getPath(), protectedId);
                         }
+                        catch ( KeeperException.NoNodeException ignore )
+                        {
+                            // ignore
+                        }
+                        return foundNode;
                     }
-                    catch ( KeeperException.NoNodeException ignore )
-                    {
-                        // ignore
-                    }
-                    return foundNode;
                 }
-            }
-        );
+            );
 
         trace.commit();
         return returnPath;
     }
 
-    private String  adjustPath(String path) throws Exception
+    private String adjustPath(String path) throws Exception
     {
         if ( doProtected )
         {
-            ZKPaths.PathAndNode     pathAndNode = ZKPaths.getPathAndNode(path);
-            String                  name = getProtectedPrefix() + pathAndNode.getNode();
+            ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+            String name = getProtectedPrefix(protectedId) + pathAndNode.getNode();
             path = ZKPaths.makePath(pathAndNode.getPath(), name);
         }
         return path;
     }
+
+    /**
+     * Attempt to delete a protected znode
+     *
+     * @param path        the path
+     * @param protectedId the protected id
+     * @param callback    callback to use, <code>null</code> to create a new one
+     */
+    private void findAndDeleteProtectedNodeInBackground(String path, String protectedId, FindProtectedNodeCB callback)
+    {
+        if ( client.isStarted() )
+        {
+            if ( callback == null )
+            {
+                callback = new FindProtectedNodeCB(path, protectedId);
+            }
+            try
+            {
+                client.getChildren().inBackground(callback).forPath(ZKPaths.getPathAndNode(path).getPath());
+            }
+            catch ( Exception e )
+            {
+                findAndDeleteProtectedNodeInBackground(path, protectedId, callback);
+            }
+        }
+    }
+
+    private class FindProtectedNodeCB implements BackgroundCallback
+    {
+        final String path;
+        final String protectedId;
+
+        private FindProtectedNodeCB(String path, String protectedId)
+        {
+            this.path = path;
+            this.protectedId = protectedId;
+        }
+
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+        {
+            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+            {
+                final String node = findNode(event.getChildren(), ZKPaths.getPathAndNode(path).getPath(), protectedId);
+                if ( node != null )
+                {
+                    client.delete().guaranteed().inBackground().forPath(node);
+                }
+            }
+            else if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+            {
+                // retry
+                findAndDeleteProtectedNodeInBackground(path, protectedId, this);
+            }
+        }
+    }
+
+    /**
+     * Attempt to find the znode that matches the given path and protected id
+     *
+     * @param children    a list of candidates znodes
+     * @param path        the path
+     * @param protectedId the protected id
+     * @return the absolute path of the znode or <code>null</code> if it is not found
+     */
+    private static String findNode(final List<String> children, final String path, final String protectedId)
+    {
+        final String protectedPrefix = getProtectedPrefix(protectedId);
+        String foundNode = Iterables.find
+            (
+                children,
+                new Predicate<String>()
+                {
+                    @Override
+                    public boolean apply(String node)
+                    {
+                        return node.startsWith(protectedPrefix);
+                    }
+                },
+                null
+            );
+        if ( foundNode != null )
+        {
+            foundNode = ZKPaths.makePath(path, foundNode);
+        }
+        return foundNode;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/4b166a09/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
new file mode 100644
index 0000000..5f10c5e
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.leader;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+/**
+ * A connection factory that will behave like the NIOServerCnxnFactory except that
+ * it will unexpectedly close the connection right after the <b>first</b> znode has
+ * been created in Zookeeper.
+ * Subsequent create operations will succeed.
+ */
+public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory
+{
+    public static final String CHAOS_ZNODE = "/mylock";
+    public static final String CHAOS_ZNODE_PREFIX = CHAOS_ZNODE + "/";
+
+    private static final Logger log = LoggerFactory.getLogger(ChaosMonkeyCnxnFactory.class);
+
+    /* How long after the first error, connections are rejected */
+    public static final long LOCKOUT_DURATION_MS = 6000;
+
+    public ChaosMonkeyCnxnFactory() throws IOException
+    {
+    }
+
+    @Override
+    public void startup(ZooKeeperServer zks) throws IOException, InterruptedException
+    {
+        super.startup(new ChaosMonkeyZookeeperServer(zks));
+    }
+
+    /**
+     * Build a connection with a Chaos Monkey ZookeeperServer
+     */
+    protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException
+    {
+        return new NIOServerCnxn(zkServer, sock, sk, this);
+    }
+
+    public static class ChaosMonkeyZookeeperServer extends ZooKeeperServer
+    {
+        private long firstError = 0;
+
+        public ChaosMonkeyZookeeperServer(ZooKeeperServer zks)
+        {
+            setTxnLogFactory(zks.getTxnLogFactory());
+            setTickTime(zks.getTickTime());
+            setMinSessionTimeout(zks.getMinSessionTimeout());
+            setMaxSessionTimeout(zks.getMaxSessionTimeout());
+        }
+
+        @Override
+        public void submitRequest(Request si)
+        {
+            long remaining = firstError != 0 ? LOCKOUT_DURATION_MS - (System.currentTimeMillis() - firstError) : 0;
+            if ( si.type != ZooDefs.OpCode.createSession && si.type != ZooDefs.OpCode.sync && si.type != ZooDefs.OpCode.ping
+                && firstError != 0 && remaining > 0 )
+            {
+                log.debug("Rejected : " + si.toString());
+                // Still reject request
+                log.debug("Still not ready for " + remaining + "ms");
+                ((NIOServerCnxn)si.cnxn).close();
+                return;
+            }
+            // Submit the request to the legacy Zookeeper server
+            log.debug("Applied : " + si.toString());
+            super.submitRequest(si);
+            // Raise an error if a lock is created
+            if ( si.type == ZooDefs.OpCode.create )
+            {
+                CreateRequest createRequest = new CreateRequest();
+                try
+                {
+                    ByteBuffer duplicate = si.request.duplicate();
+                    duplicate.rewind();
+                    ByteBufferInputStream.byteBuffer2Record(duplicate, createRequest);
+                    if ( createRequest.getPath().startsWith(CHAOS_ZNODE_PREFIX)
+                        && firstError == 0 )
+                    {
+                        firstError = System.currentTimeMillis();
+                        // The znode has been created, close the connection and don't tell it to client
+                        log.warn("Closing connection right after " + createRequest.getPath() + " creation");
+                        ((NIOServerCnxn)si.cnxn).close();
+                    }
+                }
+                catch ( Exception e )
+                {
+                    // Should not happen
+                    ((NIOServerCnxn)si.cnxn).close();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/4b166a09/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
new file mode 100644
index 0000000..e89c958
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelectorEdges.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.leader;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test cases designed after CURATOR-45
+ */
+public class TestLeaderSelectorEdges extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @BeforeClass
+    public static void setCNXFactory()
+    {
+        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, ChaosMonkeyCnxnFactory.class.getName());
+    }
+
+    @AfterClass
+    public static void resetCNXFactory()
+    {
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+    }
+
+    /**
+     * Create a LeaderSelector but close the connection right after the "lock" znode
+     * has been created.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void flappingTest() throws Exception
+    {
+        final CuratorFramework client =
+            CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryNTimes(1, 500))
+                .sessionTimeoutMs(30000)
+                .build();
+
+        final TestLeaderSelectorListener listener = new TestLeaderSelectorListener();
+        LeaderSelector leaderSelector1 =
+            new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE, listener);
+        LeaderSelector leaderSelector2 = null;
+
+        client.start();
+        try
+        {
+            client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+            leaderSelector1.start();
+            // At this point the ChaosMonkeyZookeeperServer must close the connection
+            // right after the lock znode is created.
+            Assert.assertTrue(listener.reconnected.await(10, TimeUnit.SECONDS), "Connection has not been lost");
+            // Check that leader ship has failed
+            Assert.assertEquals(listener.takeLeadership.getCount(), 1);
+            // Wait FailedDelete
+            Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+            // Check that there is no znode
+            final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+            Assert.assertEquals(children, 0,
+                "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+            // Check that a new LeaderSelector can be started
+            leaderSelector2 = new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE,
+                listener);
+            leaderSelector2.start();
+            Assert.assertTrue(listener.takeLeadership.await(1, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            try
+            {
+                leaderSelector1.close();
+            }
+            catch ( IllegalStateException e )
+            {
+                Assert.fail(e.getMessage());
+            }
+            try
+            {
+                if ( leaderSelector2 != null )
+                {
+                    leaderSelector2.close();
+                }
+            }
+            catch ( IllegalStateException e )
+            {
+                Assert.fail(e.getMessage());
+            }
+            client.close();
+        }
+    }
+
+    private class TestLeaderSelectorListener implements LeaderSelectorListener
+    {
+        final CountDownLatch takeLeadership = new CountDownLatch(1);
+        final CountDownLatch reconnected = new CountDownLatch(1);
+
+        @Override
+        public void takeLeadership(CuratorFramework client) throws Exception
+        {
+            log.info("-->takeLeadership({})", client.toString());
+            takeLeadership.countDown();
+            log.info("<--takeLeadership({})", client.toString());
+        }
+
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( newState == ConnectionState.RECONNECTED )
+            {
+                reconnected.countDown();
+            }
+        }
+
+    }
+
+    /**
+     * Create a protected node in background with a retry policy
+     */
+    @Test
+    public void createProtectedNodeInBackgroundTest() throws Exception
+    {
+        final CuratorFramework client =
+            CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryNTimes(2, 1))
+                .connectionTimeoutMs(100)
+                .sessionTimeoutMs(60000)
+                .build();
+        final CountDownLatch latch = new CountDownLatch(1);
+        client.start();
+        try
+        {
+            client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+            client.create()
+                .withProtection()
+                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+                .inBackground(
+                    new BackgroundCallback()
+                    {
+                        public void processResult(CuratorFramework client, CuratorEvent event)
+                            throws Exception
+                        {
+                            log.info("Receive event {}", event.toString());
+                            if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+                            {
+                                latch.countDown();
+                            }
+                        }
+                    }
+                             )
+                .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
+
+            Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
+            // Wait for the znode to be deleted
+            Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+            // Check that there is no znode
+            final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+            Assert.assertEquals(children, 0,
+                "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+        }
+        finally
+        {
+            client.close();
+        }
+
+    }
+
+    /**
+     * Same test as above but without a retry policy
+     */
+    @Test
+    public void createProtectedNodeInBackgroundTestNoRetry() throws Exception
+    {
+        final CuratorFramework client =
+            CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryNTimes(0, 0))
+                .connectionTimeoutMs(100)
+                .sessionTimeoutMs(60000)
+                .build();
+        final CountDownLatch latch = new CountDownLatch(1);
+        client.start();
+        try
+        {
+            client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
+            client.create()
+                .withProtection()
+                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+                .inBackground(
+                    new BackgroundCallback()
+                    {
+                        public void processResult(CuratorFramework client, CuratorEvent event)
+                            throws Exception
+                        {
+                            log.info("Receive event {}", event.toString());
+                            if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() )
+                            {
+                                latch.countDown();
+                            }
+                        }
+                    }
+                             )
+                .forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
+
+            Assert.assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
+            // Wait for the znode to be deleted
+            Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
+            // Check that there is no znode
+            final int children = client.getChildren().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE).size();
+            Assert.assertEquals(children, 0,
+                "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
+        }
+        finally
+        {
+            client.close();
+        }
+
+    }
+}


[3/3] git commit: Merge branch 'CURATOR-45' of https://git-wip-us.apache.org/repos/asf/incubator-curator into CURATOR-45

Posted by ra...@apache.org.
Merge branch 'CURATOR-45' of https://git-wip-us.apache.org/repos/asf/incubator-curator into CURATOR-45


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/9460a467
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/9460a467
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/9460a467

Branch: refs/heads/master
Commit: 9460a467e11ddcedc4349edd2212164cddc6955f
Parents: 4b166a0 9705b41
Author: randgalt <ra...@apache.org>
Authored: Wed Sep 25 09:08:35 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Wed Sep 25 09:08:35 2013 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------