You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2015/04/22 01:07:39 UTC

[42/50] curator git commit: Guaranteed deletes must accept InterruptedException as well as retryable exceptions

Guaranteed deletes must accept InterruptedException as well as retryable exceptions


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/989f9414
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/989f9414
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/989f9414

Branch: refs/heads/CURATOR-154
Commit: 989f94148faae97f23368e4b5bae2f1f03eaa62c
Parents: 6a56c51
Author: randgalt <ra...@apache.org>
Authored: Sun Apr 19 13:13:45 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Apr 19 13:13:45 2015 -0500

----------------------------------------------------------------------
 .../framework/imps/DeleteBuilderImpl.java       | 110 ++++++++++---------
 .../recipes/leader/TestLeaderSelector.java      |  67 +++++++++++
 2 files changed, 127 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/989f9414/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 5d8b846..c067357 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import org.apache.curator.RetryLoop;
@@ -40,11 +41,11 @@ import java.util.concurrent.Executor;
 
 class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
 {
-    private final CuratorFrameworkImpl  client;
-    private int                         version;
-    private Backgrounding               backgrounding;
-    private boolean                     deletingChildrenIfNeeded;
-    private boolean                     guaranteed;
+    private final CuratorFrameworkImpl client;
+    private int version;
+    private Backgrounding backgrounding;
+    private boolean deletingChildrenIfNeeded;
+    private boolean guaranteed;
 
     DeleteBuilderImpl(CuratorFrameworkImpl client)
     {
@@ -55,14 +56,14 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
         guaranteed = false;
     }
 
-    TransactionDeleteBuilder    asTransactionDeleteBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
+    TransactionDeleteBuilder asTransactionDeleteBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
     {
         return new TransactionDeleteBuilder()
         {
             @Override
             public CuratorTransactionBridge forPath(String path) throws Exception
             {
-                String      fixedPath = client.fixForNamespace(path);
+                String fixedPath = client.fixForNamespace(path);
                 transaction.add(Op.delete(fixedPath, version), OperationType.DELETE, path);
                 return curatorTransaction;
             }
@@ -142,32 +143,35 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
     {
-        final TimeTrace   trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Background");
+        final TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Background");
         client.getZooKeeper().delete
-        (
-            operationAndData.getData(),
-            version,
-            new AsyncCallback.VoidCallback()
-            {
-                @Override
-                public void processResult(int rc, String path, Object ctx)
+            (
+                operationAndData.getData(),
+                version,
+                new AsyncCallback.VoidCallback()
                 {
-                    trace.commit();
-                    if ((rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded) {
-                        backgroundDeleteChildrenThenNode(operationAndData);
-                    } else {
-                        CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null);
-                        client.processBackgroundOperation(operationAndData, event);
+                    @Override
+                    public void processResult(int rc, String path, Object ctx)
+                    {
+                        trace.commit();
+                        if ( (rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded )
+                        {
+                            backgroundDeleteChildrenThenNode(operationAndData);
+                        }
+                        else
+                        {
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null);
+                            client.processBackgroundOperation(operationAndData, event);
+                        }
                     }
-                }
-            },
-            backgrounding.getContext()
-        );
+                },
+                backgrounding.getContext()
+            );
     }
 
     private void backgroundDeleteChildrenThenNode(final OperationAndData<String> mainOperationAndData)
     {
-        BackgroundOperation<String>     operation = new BackgroundOperation<String>()
+        BackgroundOperation<String> operation = new BackgroundOperation<String>()
         {
             @Override
             public void performBackgroundOperation(OperationAndData<String> dummy) throws Exception
@@ -190,12 +194,12 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
     @Override
     public Void forPath(String path) throws Exception
     {
-        final String        unfixedPath = path;
+        final String unfixedPath = path;
         path = client.fixForNamespace(path);
 
         if ( backgrounding.inBackground() )
         {
-            OperationAndData.ErrorCallback<String>  errorCallback = null;
+            OperationAndData.ErrorCallback<String> errorCallback = null;
             if ( guaranteed )
             {
                 errorCallback = new OperationAndData.ErrorCallback<String>()
@@ -223,35 +227,41 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
 
     private void pathInForeground(final String path, String unfixedPath) throws Exception
     {
-        TimeTrace       trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Foreground");
+        TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Foreground");
         try
         {
             RetryLoop.callWithRetry
-            (
-                client.getZookeeperClient(),
-                new Callable<Void>()
-                {
-                    @Override
-                    public Void call() throws Exception
+                (
+                    client.getZookeeperClient(),
+                    new Callable<Void>()
                     {
-                        try {
-                        client.getZooKeeper().delete(path, version);
-                        } catch (KeeperException.NotEmptyException e) {
-                            if (deletingChildrenIfNeeded) {
-                                ZKPaths.deleteChildren(client.getZooKeeper(), path, true);
-                            } else {
-                                throw e;
+                        @Override
+                        public Void call() throws Exception
+                        {
+                            try
+                            {
+                                client.getZooKeeper().delete(path, version);
                             }
+                            catch ( KeeperException.NotEmptyException e )
+                            {
+                                if ( deletingChildrenIfNeeded )
+                                {
+                                    ZKPaths.deleteChildren(client.getZooKeeper(), path, true);
+                                }
+                                else
+                                {
+                                    throw e;
+                                }
+                            }
+                            return null;
                         }
-                        return null;
                     }
-                }
-            );
-        }      
+                );
+        }
         catch ( Exception e )
         {
             //Only retry a guaranteed delete if it's a retryable error
-            if( RetryLoop.isRetryException(e) && guaranteed )
+            if( (RetryLoop.isRetryException(e) || (e instanceof InterruptedException)) && guaranteed )
             {
                 client.getFailedDeleteManager().addFailedDelete(unfixedPath);
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/989f9414/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index ec909f7..c7f415c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.KillSession;
@@ -34,6 +35,7 @@ import org.testng.annotations.Test;
 import org.testng.internal.annotations.Sets;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -48,6 +50,71 @@ public class TestLeaderSelector extends BaseClassForTests
     private static final String PATH_NAME = "/one/two/me";
 
     @Test
+    public void testLeaderNodeDeleteOnInterrupt() throws Exception
+    {
+        Timing timing = new Timing();
+        LeaderSelector selector = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState == ConnectionState.RECONNECTED )
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                }
+            };
+            client.getConnectionStateListenable().addListener(connectionStateListener);
+            client.start();
+
+            final BlockingQueue<Thread> queue = new ArrayBlockingQueue<Thread>(1);
+            LeaderSelectorListener listener = new LeaderSelectorListener()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws Exception
+                {
+                    queue.add(Thread.currentThread());
+                    try
+                    {
+                        Thread.currentThread().join();
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                }
+            };
+            selector = new LeaderSelector(client, "/leader", listener);
+            selector.start();
+
+            Thread leaderThread = queue.take();
+            server.stop();
+            leaderThread.interrupt();
+            server.restart();
+            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+            timing.sleepABit();
+
+            Assert.assertEquals(client.getChildren().forPath("/leader").size(), 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(selector);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testInterruptLeadershipWithRequeue() throws Exception
     {
         Timing timing = new Timing();