You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/04/19 20:13:58 UTC
curator git commit: Guaranteed deletes must accept
InterruptedException as well as retryable exceptions
Repository: curator
Updated Branches:
refs/heads/CURATOR-202 [created] 989f94148
Guaranteed deletes must accept InterruptedException as well as retryable exceptions
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/989f9414
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/989f9414
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/989f9414
Branch: refs/heads/CURATOR-202
Commit: 989f94148faae97f23368e4b5bae2f1f03eaa62c
Parents: 6a56c51
Author: randgalt <ra...@apache.org>
Authored: Sun Apr 19 13:13:45 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Apr 19 13:13:45 2015 -0500
----------------------------------------------------------------------
.../framework/imps/DeleteBuilderImpl.java | 110 ++++++++++---------
.../recipes/leader/TestLeaderSelector.java | 67 +++++++++++
2 files changed, 127 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/989f9414/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 5d8b846..c067357 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import org.apache.curator.RetryLoop;
@@ -40,11 +41,11 @@ import java.util.concurrent.Executor;
class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
{
- private final CuratorFrameworkImpl client;
- private int version;
- private Backgrounding backgrounding;
- private boolean deletingChildrenIfNeeded;
- private boolean guaranteed;
+ private final CuratorFrameworkImpl client;
+ private int version;
+ private Backgrounding backgrounding;
+ private boolean deletingChildrenIfNeeded;
+ private boolean guaranteed;
DeleteBuilderImpl(CuratorFrameworkImpl client)
{
@@ -55,14 +56,14 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
guaranteed = false;
}
- TransactionDeleteBuilder asTransactionDeleteBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
+ TransactionDeleteBuilder asTransactionDeleteBuilder(final CuratorTransactionImpl curatorTransaction, final CuratorMultiTransactionRecord transaction)
{
return new TransactionDeleteBuilder()
{
@Override
public CuratorTransactionBridge forPath(String path) throws Exception
{
- String fixedPath = client.fixForNamespace(path);
+ String fixedPath = client.fixForNamespace(path);
transaction.add(Op.delete(fixedPath, version), OperationType.DELETE, path);
return curatorTransaction;
}
@@ -142,32 +143,35 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
- final TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Background");
+ final TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Background");
client.getZooKeeper().delete
- (
- operationAndData.getData(),
- version,
- new AsyncCallback.VoidCallback()
- {
- @Override
- public void processResult(int rc, String path, Object ctx)
+ (
+ operationAndData.getData(),
+ version,
+ new AsyncCallback.VoidCallback()
{
- trace.commit();
- if ((rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded) {
- backgroundDeleteChildrenThenNode(operationAndData);
- } else {
- CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null);
- client.processBackgroundOperation(operationAndData, event);
+ @Override
+ public void processResult(int rc, String path, Object ctx)
+ {
+ trace.commit();
+ if ( (rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded )
+ {
+ backgroundDeleteChildrenThenNode(operationAndData);
+ }
+ else
+ {
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null);
+ client.processBackgroundOperation(operationAndData, event);
+ }
}
- }
- },
- backgrounding.getContext()
- );
+ },
+ backgrounding.getContext()
+ );
}
private void backgroundDeleteChildrenThenNode(final OperationAndData<String> mainOperationAndData)
{
- BackgroundOperation<String> operation = new BackgroundOperation<String>()
+ BackgroundOperation<String> operation = new BackgroundOperation<String>()
{
@Override
public void performBackgroundOperation(OperationAndData<String> dummy) throws Exception
@@ -190,12 +194,12 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
@Override
public Void forPath(String path) throws Exception
{
- final String unfixedPath = path;
+ final String unfixedPath = path;
path = client.fixForNamespace(path);
if ( backgrounding.inBackground() )
{
- OperationAndData.ErrorCallback<String> errorCallback = null;
+ OperationAndData.ErrorCallback<String> errorCallback = null;
if ( guaranteed )
{
errorCallback = new OperationAndData.ErrorCallback<String>()
@@ -223,35 +227,41 @@ class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>
private void pathInForeground(final String path, String unfixedPath) throws Exception
{
- TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Foreground");
+ TimeTrace trace = client.getZookeeperClient().startTracer("DeleteBuilderImpl-Foreground");
try
{
RetryLoop.callWithRetry
- (
- client.getZookeeperClient(),
- new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
+ (
+ client.getZookeeperClient(),
+ new Callable<Void>()
{
- try {
- client.getZooKeeper().delete(path, version);
- } catch (KeeperException.NotEmptyException e) {
- if (deletingChildrenIfNeeded) {
- ZKPaths.deleteChildren(client.getZooKeeper(), path, true);
- } else {
- throw e;
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ client.getZooKeeper().delete(path, version);
}
+ catch ( KeeperException.NotEmptyException e )
+ {
+ if ( deletingChildrenIfNeeded )
+ {
+ ZKPaths.deleteChildren(client.getZooKeeper(), path, true);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ return null;
}
- return null;
}
- }
- );
- }
+ );
+ }
catch ( Exception e )
{
//Only retry a guaranteed delete if it's a retryable error
- if( RetryLoop.isRetryException(e) && guaranteed )
+ if( (RetryLoop.isRetryException(e) || (e instanceof InterruptedException)) && guaranteed )
{
client.getFailedDeleteManager().addFailedDelete(unfixedPath);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/989f9414/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index ec909f7..c7f415c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.KillSession;
@@ -34,6 +35,7 @@ import org.testng.annotations.Test;
import org.testng.internal.annotations.Sets;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -48,6 +50,71 @@ public class TestLeaderSelector extends BaseClassForTests
private static final String PATH_NAME = "/one/two/me";
@Test
+ public void testLeaderNodeDeleteOnInterrupt() throws Exception
+ {
+ Timing timing = new Timing();
+ LeaderSelector selector = null;
+ CuratorFramework client = null;
+ try
+ {
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+ ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ reconnectedLatch.countDown();
+ }
+ }
+ };
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ client.start();
+
+ final BlockingQueue<Thread> queue = new ArrayBlockingQueue<Thread>(1);
+ LeaderSelectorListener listener = new LeaderSelectorListener()
+ {
+ @Override
+ public void takeLeadership(CuratorFramework client) throws Exception
+ {
+ queue.add(Thread.currentThread());
+ try
+ {
+ Thread.currentThread().join();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ }
+ };
+ selector = new LeaderSelector(client, "/leader", listener);
+ selector.start();
+
+ Thread leaderThread = queue.take();
+ server.stop();
+ leaderThread.interrupt();
+ server.restart();
+ Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+ timing.sleepABit();
+
+ Assert.assertEquals(client.getChildren().forPath("/leader").size(), 0);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(selector);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testInterruptLeadershipWithRequeue() throws Exception
{
Timing timing = new Timing();