You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ke...@apache.org on 2023/06/08 16:25:57 UTC

[curator] branch master updated: CURATOR-673: Complete BackgroundCallback if CuratorFramework got closed (#464)

This is an automated email from the ASF dual-hosted git repository.

kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new f3c889cc CURATOR-673: Complete BackgroundCallback if CuratorFramework got closed (#464)
f3c889cc is described below

commit f3c889cc3988974f47746e42c5dabd6dc6683531
Author: Kezhu Wang <ke...@apache.org>
AuthorDate: Fri Jun 9 00:25:51 2023 +0800

    CURATOR-673: Complete BackgroundCallback if CuratorFramework got closed (#464)
    
    Previously, if curator framework is closed while there are background
    operations, these operations are dropped with no dedicated events.
    
    It is hard for blocking clients to stop waiting after framework closed.
    `CuratorListener` seems like a circumvent. But apparently, it is hard
    and I don't think this is by design.
    
    This pr introduces a `clockLock` to synchronize framework closing and
    operation queuing. So we are sure that no operations could be queued
    after framework marked as closing.
    
    It also terminates operation which get retries exhausted due to background
    exception.
    
    Co-authored-by: tison <wa...@gmail.com>
---
 .../curator/framework/api/CuratorEventType.java    |   6 +-
 .../framework/imps/AddWatchBuilderImpl.java        |   5 +
 .../framework/imps/BackgroundOperation.java        |   9 +
 .../curator/framework/imps/BackgroundSyncImpl.java |   5 +
 .../curator/framework/imps/CreateBuilderImpl.java  |  20 ++
 .../framework/imps/CuratorFrameworkImpl.java       | 140 +++++++++++--
 .../imps/CuratorMultiTransactionImpl.java          |   5 +
 .../curator/framework/imps/DeleteBuilderImpl.java  |  10 +
 .../curator/framework/imps/ExistsBuilderImpl.java  |   5 +
 .../FindAndDeleteProtectedNodeInBackground.java    |   5 +
 .../curator/framework/imps/GetACLBuilderImpl.java  |   5 +
 .../framework/imps/GetChildrenBuilderImpl.java     |   5 +
 .../framework/imps/GetConfigBuilderImpl.java       |   5 +
 .../curator/framework/imps/GetDataBuilderImpl.java |   5 +
 .../curator/framework/imps/OperationAndData.java   |   5 +
 .../framework/imps/ReconfigBuilderImpl.java        |   5 +
 .../framework/imps/RemoveWatchesBuilderImpl.java   |   5 +
 .../curator/framework/imps/SetACLBuilderImpl.java  |   5 +
 .../curator/framework/imps/SetDataBuilderImpl.java |  10 +
 .../curator/framework/imps/SyncBuilderImpl.java    |   5 +
 .../curator/framework/imps/TestFramework.java      | 231 +++++++++++++++++++++
 curator-test-zk35/pom.xml                          |   6 +
 curator-test-zk36/pom.xml                          |   6 +
 .../org/apache/curator/test/BaseClassForTests.java |   6 +
 24 files changed, 492 insertions(+), 22 deletions(-)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index d5693cad..d78ab4fa 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -54,7 +54,7 @@ public enum CuratorEventType {
     CHILDREN,
 
     /**
-     * Corresponds to {@link CuratorFramework#sync(String, Object)}
+     * Corresponds to {@link CuratorFramework#sync()}
      */
     SYNC,
 
@@ -89,7 +89,7 @@ public enum CuratorEventType {
     WATCHED,
 
     /**
-     * Corresponds to {@link CuratorFramework#watches()} ()}
+     * Corresponds to {@link CuratorFramework#watchers()} ()}
      */
     REMOVE_WATCHES,
 
@@ -99,7 +99,7 @@ public enum CuratorEventType {
     CLOSING,
 
     /**
-     * Corresponds to {@link CuratorFramework#watches()}
+     * Corresponds to {@link CuratorFramework#watchers()}
      */
     ADD_WATCH
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
index 76868e8b..923edbc7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
@@ -119,6 +119,11 @@ public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, Bac
         return null;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.ADD_WATCH;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> data) throws Exception {
         String path = data.getData();
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundOperation.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundOperation.java
index 2fd81225..3ce26e1f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundOperation.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundOperation.java
@@ -19,6 +19,15 @@
 
 package org.apache.curator.framework.imps;
 
+import org.apache.curator.framework.api.CuratorEventType;
+
 interface BackgroundOperation<T> {
     public void performBackgroundOperation(OperationAndData<T> data) throws Exception;
+
+    /**
+     * Most events are delivered by operations themselves, so they know their event types.
+     * But in occasional cases(says, closing or background exception), events are delivered
+     * by curator framework.
+     */
+    CuratorEventType getBackgroundEventType();
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundSyncImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundSyncImpl.java
index f8edef3a..79785511 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundSyncImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/BackgroundSyncImpl.java
@@ -32,6 +32,11 @@ class BackgroundSyncImpl implements BackgroundOperation<String> {
         this.context = context;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.SYNC;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception {
         final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("BackgroundSyncImpl");
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 6d07aa01..ef765769 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -579,6 +579,11 @@ public class CreateBuilderImpl
         }
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.CREATE;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception {
         try {
@@ -739,6 +744,11 @@ public class CreateBuilderImpl
                 }
                 client.queueOperation(mainOperationAndData);
             }
+
+            @Override
+            public CuratorEventType getBackgroundEventType() {
+                return CuratorEventType.CREATE;
+            }
         };
         OperationAndData<T> parentOperation = new OperationAndData<>(
                 operation, mainOperationAndData.getData(), null, null, backgrounding.getContext(), null);
@@ -775,6 +785,11 @@ public class CreateBuilderImpl
                     // ignore
                 }
             }
+
+            @Override
+            public CuratorEventType getBackgroundEventType() {
+                return CuratorEventType.CREATE;
+            }
         };
         client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null));
     }
@@ -813,6 +828,11 @@ public class CreateBuilderImpl
                     client.logError("Unexpected exception in async idempotent check for, ignoring: " + path, e);
                 }
             }
+
+            @Override
+            public CuratorEventType getBackgroundEventType() {
+                return CuratorEventType.CREATE;
+            }
         };
         client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null));
     }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 9683b886..9de2e632 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -135,6 +135,8 @@ public class CuratorFrameworkImpl implements CuratorFramework {
 
     private final AtomicReference<CuratorFrameworkState> state;
 
+    private final Object closeLock = new Object();
+
     public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {
         ZookeeperFactory localZookeeperFactory =
                 makeZookeeperFactory(builder.getZookeeperFactory(), builder.getZkClientConfig());
@@ -376,10 +378,24 @@ public class CuratorFrameworkImpl implements CuratorFramework {
         }
     }
 
+    /**
+     * Change state from {@link CuratorFrameworkState#STARTED} to {@link CuratorFrameworkState#STOPPED}
+     * in {@link #closeLock}.
+     *
+     * <p>This gives us synchronized view of {@link #state} and other components in closing.</p>
+     *
+     * @return true if state changed by this call
+     */
+    private boolean closeWithLock() {
+        synchronized (closeLock) {
+            return state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED);
+        }
+    }
+
     @Override
     public void close() {
         log.debug("Closing");
-        if (state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED)) {
+        if (closeWithLock()) {
             listeners.forEach(listener -> {
                 CuratorEvent event = new CuratorEventImpl(
                         CuratorFrameworkImpl.this,
@@ -415,6 +431,9 @@ public class CuratorFrameworkImpl implements CuratorFramework {
             if (ensembleTracker != null) {
                 ensembleTracker.close();
             }
+            OperationAndData<?>[] droppedOperations = backgroundOperations.toArray(new OperationAndData<?>[0]);
+            backgroundOperations.clear();
+            Arrays.stream(droppedOperations).forEach(this::closeOperation);
             listeners.clear();
             unhandledErrorListeners.clear();
             connectionStateManager.close();
@@ -644,15 +663,103 @@ public class CuratorFrameworkImpl implements CuratorFramework {
         }
     }
 
+    private void abortOperation(OperationAndData<?> operation, Throwable e) {
+        if (operation.getCallback() == null) {
+            return;
+        }
+        CuratorEvent event;
+        if (e instanceof KeeperException) {
+            event = new CuratorEventImpl(
+                    this,
+                    operation.getEventType(),
+                    ((KeeperException) e).code().intValue(),
+                    ((KeeperException) e).getPath(),
+                    null,
+                    operation.getContext(),
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+        } else if (getState() == CuratorFrameworkState.STARTED) {
+            event = new CuratorEventImpl(
+                    this,
+                    operation.getEventType(),
+                    KeeperException.Code.SYSTEMERROR.intValue(),
+                    null,
+                    null,
+                    operation.getContext(),
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+        } else {
+            event = new CuratorEventImpl(
+                    this,
+                    operation.getEventType(),
+                    KeeperException.Code.SESSIONEXPIRED.intValue(),
+                    null,
+                    null,
+                    operation.getContext(),
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null);
+        }
+        sendToBackgroundCallback(operation, event);
+    }
+
+    private void closeOperation(OperationAndData<?> operation) {
+        if (operation.getCallback() == null) {
+            return;
+        }
+        CuratorEvent event = new CuratorEventImpl(
+                this,
+                operation.getEventType(),
+                KeeperException.Code.SESSIONEXPIRED.intValue(),
+                null,
+                null,
+                operation.getContext(),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null);
+        sendToBackgroundCallback(operation, event);
+    }
+
+    private void requeueSleepOperation(OperationAndData<?> operationAndData) {
+        operationAndData.clearSleep();
+        synchronized (closeLock) {
+            if (getState() == CuratorFrameworkState.STARTED) {
+                if (backgroundOperations.remove(operationAndData)) {
+                    // due to the internals of DelayQueue, operation must be removed/re-added so that re-sorting occurs
+                    backgroundOperations.offer(operationAndData);
+                } // This operation has been taken over by background thread.
+                return;
+            }
+        }
+        closeOperation(operationAndData);
+    }
+
     /**
      * @param operationAndData operation entry
      * @return true if the operation was actually queued, false if not
      */
     <DATA_TYPE> boolean queueOperation(OperationAndData<DATA_TYPE> operationAndData) {
-        if (getState() == CuratorFrameworkState.STARTED) {
-            backgroundOperations.offer(operationAndData);
-            return true;
+        synchronized (closeLock) {
+            if (getState() == CuratorFrameworkState.STARTED) {
+                backgroundOperations.offer(operationAndData);
+                return true;
+            }
         }
+        closeOperation(operationAndData);
         return false;
     }
 
@@ -831,7 +938,8 @@ public class CuratorFrameworkImpl implements CuratorFramework {
             operationAndData.getCallback().processResult(this, event);
         } catch (Exception e) {
             ThreadUtils.checkInterrupted(e);
-            handleBackgroundOperationException(operationAndData, e);
+            // This operation is already completed, and we don't retry a completed operation.
+            handleBackgroundOperationException(null, e);
         }
     }
 
@@ -851,7 +959,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
                     if (!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)) {
                         log.debug("Retrying operation");
                     }
-                    backgroundOperations.offer(operationAndData);
+                    queueOperation(operationAndData);
                     break;
                 } else {
                     if (!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)) {
@@ -863,6 +971,10 @@ public class CuratorFrameworkImpl implements CuratorFramework {
                 }
             }
 
+            if (operationAndData != null) {
+                abortOperation(operationAndData, e);
+            }
+
             logError("Background exception was not retry-able or retry gave up", e);
         } while (false);
     }
@@ -905,11 +1017,9 @@ public class CuratorFrameworkImpl implements CuratorFramework {
              * Fix edge case reported as CURATOR-52. Connection timeout is detected when the initial (or previously failed) connection
              * cannot be re-established. This needs to be run through the retry policy and callbacks need to get invoked, etc.
              */
-            WatchedEvent watchedEvent =
-                    new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
             CuratorEvent event = new CuratorEventImpl(
                     this,
-                    CuratorEventType.WATCHED,
+                    operationAndData.getEventType(),
                     KeeperException.Code.CONNECTIONLOSS.intValue(),
                     null,
                     null,
@@ -917,7 +1027,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
                     null,
                     null,
                     null,
-                    watchedEvent,
+                    null,
                     null,
                     null);
             if (checkBackgroundRetry(operationAndData, event)) {
@@ -945,15 +1055,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
         Collection<OperationAndData<?>> drain = new ArrayList<>(forcedSleepOperations.size());
         forcedSleepOperations.drainTo(drain);
         log.debug("Clearing sleep for {} operations", drain.size());
-        for (OperationAndData<?> operation : drain) {
-            operation.clearSleep();
-            if (backgroundOperations.remove(
-                    operation)) // due to the internals of DelayQueue, operation must be removed/re-added so that
-            // re-sorting occurs
-            {
-                backgroundOperations.offer(operation);
-            }
-        }
+        drain.forEach(this::requeueSleepOperation);
     }
 
     private void processEvent(final CuratorEvent curatorEvent) {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
index 4a9ed7b8..befe46d0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionImpl.java
@@ -151,6 +151,11 @@ public class CuratorMultiTransactionImpl
         }
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.TRANSACTION;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData)
             throws Exception {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index efd177dd..31dbf911 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -161,6 +161,11 @@ public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<Str
         return this;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.DELETE;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception {
         try {
@@ -219,6 +224,11 @@ public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<Str
                 }
                 client.queueOperation(mainOperationAndData);
             }
+
+            @Override
+            public CuratorEventType getBackgroundEventType() {
+                return CuratorEventType.DELETE;
+            }
         };
         OperationAndData<String> parentOperation = new OperationAndData<String>(
                 operation, mainOperationAndData.getData(), null, null, backgrounding.getContext(), null);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index 0b4661d6..102ab823 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -139,6 +139,11 @@ public class ExistsBuilderImpl
         return this;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.EXISTS;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception {
         try {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java
index 697e35a4..36814db3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java
@@ -61,6 +61,11 @@ class FindAndDeleteProtectedNodeInBackground implements BackgroundOperation<Void
     @VisibleForTesting
     static final AtomicBoolean debugInsertError = new AtomicBoolean(false);
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.CHILDREN;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<Void> operationAndData) throws Exception {
         final OperationTrace trace =
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
index e04eec00..2b83ace8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetACLBuilderImpl.java
@@ -101,6 +101,11 @@ public class GetACLBuilderImpl implements GetACLBuilder, BackgroundOperation<Str
         return this;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.GET_ACL;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception {
         try {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
index 9089988b..6be18d4d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
@@ -151,6 +151,11 @@ public class GetChildrenBuilderImpl
         return this;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.CHILDREN;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception {
         try {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
index 5ec913e9..0dca255f 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
@@ -182,6 +182,11 @@ public class GetConfigBuilderImpl
         }
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.GET_CONFIG;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<Void> operationAndData) throws Exception {
         try {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index a031200f..7219cb51 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -213,6 +213,11 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<S
         return this;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.GET_DATA;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception {
         try {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 14a08312..be15d882 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.curator.RetrySleeper;
 import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEventType;
 
 class OperationAndData<T> implements Delayed, RetrySleeper {
     private static final AtomicLong nextOrdinal = new AtomicLong();
@@ -113,6 +114,10 @@ class OperationAndData<T> implements Delayed, RetrySleeper {
         return operation;
     }
 
+    CuratorEventType getEventType() {
+        return operation.getBackgroundEventType();
+    }
+
     void clearSleep() {
         sleepUntilTimeMs.set(0);
     }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index f30397d5..aed240c4 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -225,6 +225,11 @@ public class ReconfigBuilderImpl
         };
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.RECONFIG;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<Void> data) throws Exception {
         try {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index c93f59fb..e0c7de85 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -275,6 +275,11 @@ public class RemoveWatchesBuilderImpl
         return namespaceWatcher;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.REMOVE_WATCHES;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception {
         try {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
index c54328d8..a3e9e367 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetACLBuilderImpl.java
@@ -127,6 +127,11 @@ public class SetACLBuilderImpl
         return resultStat;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.SET_ACL;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception {
         try {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
index 73472cac..325053aa 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
@@ -236,10 +236,20 @@ public class SetDataBuilderImpl
                     client.logError("Unexpected exception in async idempotent check for, ignoring: " + path, e);
                 }
             }
+
+            @Override
+            public CuratorEventType getBackgroundEventType() {
+                return CuratorEventType.SET_DATA;
+            }
         };
         client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null));
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.SET_DATA;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception {
         try {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
index db599cd3..d3c3a3a5 100755
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SyncBuilderImpl.java
@@ -86,6 +86,11 @@ public class SyncBuilderImpl implements SyncBuilder, BackgroundOperation<String>
         return this;
     }
 
+    @Override
+    public CuratorEventType getBackgroundEventType() {
+        return CuratorEventType.SYNC;
+    }
+
     @Override
     public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception {
         try {
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index f66fec64..5c7fb1fa 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.imps;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -32,16 +33,30 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.RetrySleeper;
 import org.apache.curator.framework.AuthInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.DeleteBuilder;
+import org.apache.curator.framework.api.ExistsBuilder;
+import org.apache.curator.framework.api.GetACLBuilder;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.curator.framework.api.SetACLBuilder;
+import org.apache.curator.framework.api.SetDataBuilder;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
@@ -800,6 +815,222 @@ public class TestFramework extends BaseClassForTests {
         }
     }
 
+    private static class AlwaysRetry implements RetryPolicy {
+        private final int retryIntervalMs;
+
+        public AlwaysRetry(int retryIntervalMs) {
+            this.retryIntervalMs = retryIntervalMs;
+        }
+
+        @Override
+        public boolean allowRetry(Throwable exception) {
+            return exception instanceof KeeperException;
+        }
+
+        @Override
+        public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper retrySleeper) {
+            try {
+                retrySleeper.sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+            }
+            return true;
+        }
+    }
+
+    /**
+     * Block until curator fully stopped to test operations initiated before stopped but running after fully stopped.
+     */
+    private static class BlockUntilFullyStopped implements CuratorFrameworkImpl.DebugBackgroundListener {
+        private final CuratorFramework client;
+        private final BackgroundOperation<?> operation;
+        private final long maxRuns;
+        private long runs = 0;
+
+        public BlockUntilFullyStopped(CuratorFramework client, BackgroundOperation<?> operation, long maxRuns) {
+            this.client = client;
+            this.operation = operation;
+            this.maxRuns = maxRuns;
+        }
+
+        @Override
+        public void listen(OperationAndData<?> data) {
+            if (operation != data.getOperation()) {
+                return;
+            }
+            runs++;
+            if (runs > maxRuns) {
+                while (!(client.getState() == CuratorFrameworkState.STOPPED
+                        && !client.getZookeeperClient().isConnected())) {
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+            }
+        }
+    }
+
+    private interface BackgroundOperationFactory {
+        BackgroundOperation<?> create(CuratorFramework client, CompletableFuture<CuratorEvent> future) throws Exception;
+    }
+
+    private void testBackgroundOperationWithConcurrentCloseAndChaosStalls(
+            BackgroundOperationFactory operationFactory, long maxRuns, long[] millisStalls) throws Exception {
+        AlwaysRetry alwaysRetry = new AlwaysRetry(2);
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), alwaysRetry);
+        client.start();
+        try {
+            // given: error background request with always-retry policy
+            CompletableFuture<CuratorEvent> future = new CompletableFuture<>();
+            BackgroundOperation<?> operation = operationFactory.create(client, future);
+
+            // These chaos steps create chances to run into concurrent contentions.
+            // They could fail this test given enough runs if there are bugs.
+            if (maxRuns > 0) {
+                ((CuratorFrameworkImpl) client).debugListener = new BlockUntilFullyStopped(client, operation, maxRuns);
+            }
+            for (long ms : millisStalls) {
+                if (ms >= 0) {
+                    Thread.sleep(ms);
+                } else {
+                    restartServer();
+                }
+            }
+
+            // when: close client while operation is queuing, retrying or awaking from sleep
+            client.close();
+
+            // then: get closing event with session expired error
+            CuratorEvent event = future.get(10, TimeUnit.SECONDS);
+            assertThat(event.getResultCode()).isEqualTo(KeeperException.Code.SESSIONEXPIRED.intValue());
+            assertThat(event.getType()).isSameAs(operation.getBackgroundEventType());
+            assertThat(event.getContext()).isSameAs(future);
+        } finally {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    private void testBackgroundOperationWithConcurrentClose(BackgroundOperationFactory operationFactory)
+            throws Exception {
+        testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, -1, new long[] {20, -1, 5});
+        testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, -1, new long[] {10});
+        testBackgroundOperationWithConcurrentCloseAndChaosStalls(operationFactory, 2, new long[] {20});
+    }
+
+    @Test
+    public void testBackgroundCreateWithConcurrentClose() throws Exception {
+        AtomicBoolean retry = new AtomicBoolean();
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            if (retry.compareAndSet(false, true)) {
+                try {
+                    client.create().forPath("/exist-path");
+                } catch (KeeperException ex) {
+                    throw new IllegalStateException(ex);
+                }
+            }
+            CreateBuilder create = client.create();
+            create.inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/exist-path");
+            return (BackgroundOperation<?>) create;
+        });
+    }
+
+    @Test
+    public void testBackgroundDeleteWithConcurrentClose() throws Exception {
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            DeleteBuilder delete = client.delete();
+            delete.inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/not-exist-path");
+            return (BackgroundOperation<?>) delete;
+        });
+    }
+
+    @Test
+    public void testBackgroundExistsWithConcurrentClose() throws Exception {
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            ExistsBuilder exists = client.checkExists();
+            exists.inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/not-exist-path");
+            return (BackgroundOperation<?>) exists;
+        });
+    }
+
+    @Test
+    public void testBackgroundGetDataWithConcurrentClose() throws Exception {
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            GetDataBuilder getData = client.getData();
+            getData.inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/not-exist-path");
+            return (BackgroundOperation<?>) getData;
+        });
+    }
+
+    @Test
+    public void testBackgroundSetDataWithConcurrentClose() throws Exception {
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            SetDataBuilder setData = client.setData();
+            setData.inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/not-exist-path");
+            return (BackgroundOperation<?>) setData;
+        });
+    }
+
+    @Test
+    public void testBackgroundChildrenWithConcurrentClose() throws Exception {
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            GetChildrenBuilder children = client.getChildren();
+            children.inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/not-exist-path");
+            return (BackgroundOperation<?>) children;
+        });
+    }
+
+    @Test
+    public void testBackgroundGetACLWithConcurrentClose() throws Exception {
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            GetACLBuilder getACL = client.getACL();
+            getACL.inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/not-exist-path");
+            return (BackgroundOperation<?>) getACL;
+        });
+    }
+
+    @Test
+    public void testBackgroundSetACLWithConcurrentClose() throws Exception {
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            SetACLBuilder setACL = client.setACL();
+            setACL.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+                    .inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/not-exist-path");
+            return (BackgroundOperation<?>) setACL;
+        });
+    }
+
+    @Test
+    public void testBackgroundTransactionWithConcurrentClose() throws Exception {
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            CuratorOp delete = client.transactionOp().delete().forPath("/not-exist-path");
+            CuratorMultiTransaction transaction = client.transaction();
+            transaction
+                    .inBackground((ignored, event) -> future.complete(event), future)
+                    .forOperations(delete);
+            return (BackgroundOperation<?>) transaction;
+        });
+    }
+
+    @Test
+    public void testBackgroundRemoveWatchesWithConcurrentClose() throws Exception {
+        testBackgroundOperationWithConcurrentClose((client, future) -> {
+            RemoveWatchesBuilderImpl removeWatches =
+                    (RemoveWatchesBuilderImpl) client.watches().removeAll();
+            removeWatches
+                    .inBackground((ignored, event) -> future.complete(event), future)
+                    .forPath("/not-exist-path");
+            return removeWatches;
+        });
+    }
+
     @Test
     public void testBackgroundGetDataWithWatch() throws Exception {
         final byte[] data1 = {1, 2, 3};
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index e0b5a780..84cf0df0 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -140,6 +140,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
diff --git a/curator-test-zk36/pom.xml b/curator-test-zk36/pom.xml
index 41707356..93774a2a 100644
--- a/curator-test-zk36/pom.xml
+++ b/curator-test-zk36/pom.xml
@@ -153,6 +153,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index fe1393ef..15bd715a 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -118,6 +118,12 @@ public class BaseClassForTests {
         }
     }
 
+    protected void restartServer() throws Exception {
+        if (server != null) {
+            server.restart();
+        }
+    }
+
     @AfterEach
     public void teardown() throws Exception {
         System.clearProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY);