You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ke...@apache.org on 2023/08/15 15:23:57 UTC

[curator] branch master updated: CURATOR-677: Complete BackgroundCallback if sub operation failed or cancelled (#467)

This is an automated email from the ASF dual-hosted git repository.

kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new bf587436 CURATOR-677: Complete BackgroundCallback if sub operation failed or cancelled (#467)
bf587436 is described below

commit bf587436c085d0361e881f919de64acb8424b1e5
Author: Kezhu Wang <ke...@apache.org>
AuthorDate: Tue Aug 15 23:23:53 2023 +0800

    CURATOR-677: Complete BackgroundCallback if sub operation failed or cancelled (#467)
    
    Currently, some background operations use auxiliary sub operations to
    complete task in case of primary conditions are not satisfied. But most
    of these sub operations count only success path, so they will hang
    `BackgroundCallback` if they are failed or cancelled due to framework
    closed.
    
    This is the leftover of [CURATOR-673][](#464).
    
    [CURATOR-673]: https://issues.apache.org/jira/browse/CURATOR-673
---
 .../curator/framework/imps/CreateBuilderImpl.java  |  36 ++-----
 .../framework/imps/CuratorFrameworkImpl.java       |  15 ++-
 .../curator/framework/imps/DeleteBuilderImpl.java  |   3 +-
 .../curator/framework/imps/ExistsBuilderImpl.java  |   1 -
 .../curator/framework/imps/OperationAndData.java   |  13 ++-
 .../curator/framework/imps/SetDataBuilderImpl.java |   9 +-
 .../curator/framework/imps/TestFramework.java      | 118 ++++++++++++++++++++-
 7 files changed, 154 insertions(+), 41 deletions(-)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index ef765769..fa208141 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -608,7 +608,6 @@ public class CreateBuilderImpl
                                 client,
                                 operationAndData,
                                 operationAndData.getData().getPath(),
-                                backgrounding,
                                 acling.getACLProviderForParents(),
                                 createParentsAsContainers);
                     } else if ((rc == KeeperException.Code.NODEEXISTS.intValue()) && setDataIfExists) {
@@ -726,7 +725,6 @@ public class CreateBuilderImpl
             final CuratorFrameworkImpl client,
             final OperationAndData<T> mainOperationAndData,
             final String path,
-            Backgrounding backgrounding,
             final InternalACLProvider aclProvider,
             final boolean createParentsAsContainers) {
         BackgroundOperation<T> operation = new BackgroundOperation<T>() {
@@ -736,8 +734,6 @@ public class CreateBuilderImpl
                     ZKPaths.mkdirs(client.getZooKeeper(), path, false, aclProvider, createParentsAsContainers);
                 } catch (KeeperException e) {
                     if (!client.getZookeeperClient().getRetryPolicy().allowRetry(e)) {
-                        sendBackgroundResponse(
-                                client, e.code().intValue(), e.getPath(), null, null, null, mainOperationAndData);
                         throw e;
                     }
                     // otherwise safe to ignore as it will get retried
@@ -750,8 +746,7 @@ public class CreateBuilderImpl
                 return CuratorEventType.CREATE;
             }
         };
-        OperationAndData<T> parentOperation = new OperationAndData<>(
-                operation, mainOperationAndData.getData(), null, null, backgrounding.getContext(), null);
+        OperationAndData<T> parentOperation = new OperationAndData<>(operation, mainOperationAndData);
         client.queueOperation(parentOperation);
     }
 
@@ -773,17 +768,13 @@ public class CreateBuilderImpl
         BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>() {
             @Override
             public void performBackgroundOperation(OperationAndData<PathAndBytes> op) throws Exception {
-                try {
-                    client.getZooKeeper()
-                            .setData(
-                                    path,
-                                    mainOperationAndData.getData().getData(),
-                                    setDataIfExistsVersion,
-                                    statCallback,
-                                    backgrounding.getContext());
-                } catch (KeeperException e) {
-                    // ignore
-                }
+                client.getZooKeeper()
+                        .setData(
+                                path,
+                                mainOperationAndData.getData().getData(),
+                                setDataIfExistsVersion,
+                                statCallback,
+                                backgrounding.getContext());
             }
 
             @Override
@@ -791,7 +782,7 @@ public class CreateBuilderImpl
                 return CuratorEventType.CREATE;
             }
         };
-        client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null));
+        client.queueOperation(new OperationAndData<>(operation, mainOperationAndData));
     }
 
     private void backgroundCheckIdempotent(
@@ -821,12 +812,7 @@ public class CreateBuilderImpl
         BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>() {
             @Override
             public void performBackgroundOperation(OperationAndData<PathAndBytes> op) throws Exception {
-                try {
-                    client.getZooKeeper().getData(path, false, dataCallback, backgrounding.getContext());
-                } catch (KeeperException e) {
-                    // ignore
-                    client.logError("Unexpected exception in async idempotent check for, ignoring: " + path, e);
-                }
+                client.getZooKeeper().getData(path, false, dataCallback, backgrounding.getContext());
             }
 
             @Override
@@ -834,7 +820,7 @@ public class CreateBuilderImpl
                 return CuratorEventType.CREATE;
             }
         };
-        client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null));
+        client.queueOperation(new OperationAndData<>(operation, mainOperationAndData));
     }
 
     private void sendBackgroundResponse(
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 019897b3..dd62006d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -431,9 +431,13 @@ public class CuratorFrameworkImpl implements CuratorFramework {
             if (ensembleTracker != null) {
                 ensembleTracker.close();
             }
-            OperationAndData<?>[] droppedOperations = backgroundOperations.toArray(new OperationAndData<?>[0]);
-            backgroundOperations.clear();
-            Arrays.stream(droppedOperations).forEach(this::closeOperation);
+            // Operations are forbidden to queue after closing, but there are still other concurrent mutations,
+            // say, un-sleeping and not fully terminated background thread. So we have to drain the queue atomically
+            // to avoid duplicated close. But DelayQueue counts Delayed::getDelay, so we have to clear it up front.
+            backgroundOperations.forEach(OperationAndData::clearSleep);
+            Collection<OperationAndData<?>> droppedOperations = new ArrayList<>(backgroundOperations.size());
+            backgroundOperations.drainTo(droppedOperations);
+            droppedOperations.forEach(this::closeOperation);
             listeners.clear();
             unhandledErrorListeners.clear();
             connectionStateManager.close();
@@ -745,7 +749,10 @@ public class CuratorFrameworkImpl implements CuratorFramework {
                 return;
             }
         }
-        closeOperation(operationAndData);
+        // Sleeping operations are queued with delay, it could have been pulled out for execution or cancellation.
+        if (backgroundOperations.remove(operationAndData)) {
+            closeOperation(operationAndData);
+        }
     }
 
     /**
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 31dbf911..958879f5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -230,8 +230,7 @@ public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<Str
                 return CuratorEventType.DELETE;
             }
         };
-        OperationAndData<String> parentOperation = new OperationAndData<String>(
-                operation, mainOperationAndData.getData(), null, null, backgrounding.getContext(), null);
+        OperationAndData<String> parentOperation = new OperationAndData<>(operation, mainOperationAndData);
         client.queueOperation(parentOperation);
     }
 
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index 102ab823..68494c48 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -193,7 +193,6 @@ public class ExistsBuilderImpl
                         client,
                         operationAndData,
                         operationAndData.getData(),
-                        backgrounding,
                         acling.getACLProviderForParents(),
                         createParentContainersIfNeeded);
             } else {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index be15d882..19e89c8e 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -36,7 +36,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper {
     private final BackgroundCallback callback;
     private final long startTimeMs = System.currentTimeMillis();
     private final ErrorCallback<T> errorCallback;
-    private final AtomicInteger retryCount = new AtomicInteger(0);
+    private final AtomicInteger retryCount;
     private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
     private final AtomicLong ordinal = new AtomicLong();
     private final Object context;
@@ -46,6 +46,16 @@ class OperationAndData<T> implements Delayed, RetrySleeper {
         void retriesExhausted(OperationAndData<T> operationAndData);
     }
 
+    OperationAndData(BackgroundOperation<T> operation, OperationAndData<T> main) {
+        this.operation = operation;
+        this.data = main.data;
+        this.callback = main.callback;
+        this.errorCallback = main.errorCallback;
+        this.context = main.context;
+        this.connectionRequired = main.connectionRequired;
+        this.retryCount = main.retryCount;
+    }
+
     OperationAndData(
             BackgroundOperation<T> operation,
             T data,
@@ -59,6 +69,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper {
         this.errorCallback = errorCallback;
         this.context = context;
         this.connectionRequired = connectionRequired;
+        this.retryCount = new AtomicInteger(0);
         reset();
     }
 
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
index 325053aa..73ba8a54 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
@@ -229,12 +229,7 @@ public class SetDataBuilderImpl
         BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>() {
             @Override
             public void performBackgroundOperation(OperationAndData<PathAndBytes> op) throws Exception {
-                try {
-                    client.getZooKeeper().getData(path, false, dataCallback, backgrounding.getContext());
-                } catch (KeeperException e) {
-                    // ignore
-                    client.logError("Unexpected exception in async idempotent check for, ignoring: " + path, e);
-                }
+                client.getZooKeeper().getData(path, false, dataCallback, backgrounding.getContext());
             }
 
             @Override
@@ -242,7 +237,7 @@ public class SetDataBuilderImpl
                 return CuratorEventType.SET_DATA;
             }
         };
-        client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null));
+        client.queueOperation(new OperationAndData<>(operation, mainOperationAndData));
     }
 
     @Override
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index ea7fc048..d28b814e 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -39,6 +39,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.RetrySleeper;
 import org.apache.curator.framework.AuthInfo;
@@ -60,6 +61,7 @@ import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
 import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
@@ -917,6 +919,16 @@ public class TestFramework extends BaseClassForTests {
         BackgroundOperation<?> create(CuratorFramework client, CompletableFuture<CuratorEvent> future) throws Exception;
     }
 
+    private static class CountingCompletableFuture<T> extends CompletableFuture<T> {
+        private final AtomicInteger completes = new AtomicInteger();
+
+        @Override
+        public boolean complete(T value) {
+            completes.incrementAndGet();
+            return super.complete(value);
+        }
+    }
+
     private void testBackgroundOperationWithConcurrentCloseAndChaosStalls(
             BackgroundOperationFactory operationFactory, long maxRuns, long[] millisStalls) throws Exception {
         AlwaysRetry alwaysRetry = new AlwaysRetry(2);
@@ -924,7 +936,7 @@ public class TestFramework extends BaseClassForTests {
         client.start();
         try {
             // given: error background request with always-retry policy
-            CompletableFuture<CuratorEvent> future = new CompletableFuture<>();
+            CountingCompletableFuture<CuratorEvent> future = new CountingCompletableFuture<>();
             BackgroundOperation<?> operation = operationFactory.create(client, future);
 
             // These chaos steps create chances to run into concurrent contentions.
@@ -948,6 +960,7 @@ public class TestFramework extends BaseClassForTests {
             assertThat(event.getResultCode()).isEqualTo(KeeperException.Code.SESSIONEXPIRED.intValue());
             assertThat(event.getType()).isSameAs(operation.getBackgroundEventType());
             assertThat(event.getContext()).isSameAs(future);
+            assertThat(future.completes.get()).isEqualTo(1);
         } finally {
             CloseableUtils.closeQuietly(client);
         }
@@ -957,6 +970,7 @@ public class TestFramework extends BaseClassForTests {
             throws Exception {
         testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, -1, new long[] {20, -1, 5});
         testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, -1, new long[] {10});
+        testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, -1, new long[] {200});
         testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, 2, new long[] {20});
     }
 
@@ -978,6 +992,67 @@ public class TestFramework extends BaseClassForTests {
         });
     }
 
+    @Test
+    public void testBackgroundCreateSetDataIfExistsWithConcurrentClose() throws Exception {
+        AtomicBoolean retry = new AtomicBoolean();
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            if (retry.compareAndSet(false, true)) {
+                try {
+                    client.create().forPath("/exist-path");
+                } catch (KeeperException ex) {
+                    throw new IllegalStateException(ex);
+                }
+            }
+            CreateBuilder create = client.create();
+            create.orSetData(Integer.MAX_VALUE)
+                    .inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/exist-path");
+            return (BackgroundOperation<?>) create;
+        });
+    }
+
+    @Test
+    public void testBackgroundCreateIdempotentWithConcurrentClose() throws Exception {
+        AtomicBoolean retry = new AtomicBoolean();
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            if (retry.compareAndSet(false, true)) {
+                try {
+                    client.create().forPath("/exist-path", "some-data".getBytes());
+                } catch (KeeperException ex) {
+                    throw new IllegalStateException(ex);
+                }
+            }
+            CreateBuilder create = client.create();
+            create.idempotent()
+                    .inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/exist-path", "different-data".getBytes());
+            return (BackgroundOperation<?>) create;
+        });
+    }
+
+    @Test
+    public void testBackgroundCreateParentsIfNeedWithConcurrentClose() throws Exception {
+        AtomicBoolean retry = new AtomicBoolean();
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            if (retry.compareAndSet(false, true)) {
+                try {
+                    // Disable CREATE child permission for grandparent path.
+                    client.create()
+                            .withACL(Collections.singletonList(
+                                    new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE)))
+                            .forPath("/grandparent");
+                } catch (KeeperException ex) {
+                    throw new IllegalStateException(ex);
+                }
+            }
+            CreateBuilder create = client.create();
+            create.creatingParentsIfNeeded()
+                    .inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/grandparent/parent/child");
+            return (BackgroundOperation<?>) create;
+        });
+    }
+
     @Test
     public void testBackgroundDeleteWithConcurrentClose() throws Exception {
         testBackgroundOperationWithConcurrentClose((client, future) -> {
@@ -988,6 +1063,31 @@ public class TestFramework extends BaseClassForTests {
         });
     }
 
+    @Test
+    public void testBackgroundDeleteNotEmptyAndACLWithConcurrentClose() throws Exception {
+        AtomicBoolean retry = new AtomicBoolean();
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            if (retry.compareAndSet(false, true)) {
+                try (CuratorFramework authedClient = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .authorization("digest", "me1:pass1".getBytes())
+                        .retryPolicy(new RetryForever(2))
+                        .build()) {
+                    client.create().forPath("/not-empty-path");
+                    authedClient.start();
+                    authedClient.create().withACL(ZooDefs.Ids.CREATOR_ALL_ACL).forPath("/not-empty-path/child");
+                } catch (KeeperException ex) {
+                    throw new IllegalStateException(ex);
+                }
+            }
+            DeleteBuilder delete = client.delete();
+            delete.deletingChildrenIfNeeded()
+                    .inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/not-empty-path");
+            return (BackgroundOperation<?>) delete;
+        });
+    }
+
     @Test
     public void testBackgroundExistsWithConcurrentClose() throws Exception {
         testBackgroundOperationWithConcurrentClose((client, future) -> {
@@ -1018,6 +1118,22 @@ public class TestFramework extends BaseClassForTests {
         });
     }
 
+    @Test
+    public void testBackgroundSetDataIdempotentWithConcurrentClose() throws Exception {
+        AtomicBoolean retry = new AtomicBoolean();
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            if (retry.compareAndSet(false, true)) {
+                client.create().forPath("/bad-version-path", "version1".getBytes());
+            }
+            SetDataBuilder setData = client.setData();
+            setData.idempotent()
+                    .withVersion(333)
+                    .inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/bad-version-path");
+            return (BackgroundOperation<?>) setData;
+        });
+    }
+
     @Test
     public void testBackgroundChildrenWithConcurrentClose() throws Exception {
         testBackgroundOperationWithConcurrentClose((client, future) -> {