You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/23 15:34:57 UTC

[pulsar] branch branch-2.8 updated: [improve][broker]Add retry for zk operation on connection loss (#17226)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new a9f3664ecc9 [improve][broker]Add retry for zk operation on connection loss (#17226)
a9f3664ecc9 is described below

commit a9f3664ecc95374474362507bb8927271c23dfe9
Author: Hang Chen <ch...@apache.org>
AuthorDate: Tue Aug 23 23:34:48 2022 +0800

    [improve][broker]Add retry for zk operation on connection loss (#17226)
---
 .../metadata/impl/AbstractMetadataStore.java       |  6 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      | 81 +++++++++++++++++-----
 2 files changed, 68 insertions(+), 19 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index a0ba33b3bee..55138a96f80 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -33,9 +33,9 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
@@ -59,7 +59,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
 
     private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>();
     private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
-    private final ExecutorService executor;
+    protected final ScheduledExecutorService executor;
     private final AsyncLoadingCache<String, List<String>> childrenCache;
     private final AsyncLoadingCache<String, Boolean> existsCache;
     private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
@@ -70,7 +70,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
 
     protected AbstractMetadataStore() {
         this.executor = Executors
-                .newSingleThreadExecutor(new DefaultThreadFactory("metadata-store"));
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store"));
         registerListener(this);
 
         this.childrenCache = Caffeine.newBuilder()
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index e1e44f816c6..49d21bf46d3 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.util.ZkUtils;
@@ -94,7 +95,11 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
     @Override
     public CompletableFuture<Optional<GetResult>> get(String path) {
         CompletableFuture<Optional<GetResult>> future = new CompletableFuture<>();
+        getInternal(path, future);
+        return future;
+    }
 
+    private void getInternal(String path, CompletableFuture<Optional<GetResult>> future) {
         try {
             zkc.getData(path, this, (rc, path1, ctx, data, stat) -> {
                 execute(() -> {
@@ -120,7 +125,13 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                             return null;
                         });
                         future.complete(Optional.empty());
-                    } else {
+                    } else if (code == Code.CONNECTIONLOSS) {
+                        // There is the chance that we caused a connection reset by sending or requesting a batch
+                        // that passed the max ZK limit. Retry with the individual operations
+                        log.warn("Zookeeper connection loss, get {}, retry after 100ms", path);
+                        executor.schedule(() -> getInternal(path, future),
+                            100, TimeUnit.MILLISECONDS);
+                    }  else {
                         future.completeExceptionally(getException(code, path));
                     }
                 }, future);
@@ -128,14 +139,16 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
         } catch (Throwable t) {
             future.completeExceptionally(new MetadataStoreException(t));
         }
-
-        return future;
     }
 
     @Override
     public CompletableFuture<List<String>> getChildrenFromStore(String path) {
         CompletableFuture<List<String>> future = new CompletableFuture<>();
+        getChildrenFromStoreInternal(path, future);
+        return future;
+    }
 
+    private void getChildrenFromStoreInternal(String path, CompletableFuture<List<String>> future) {
         try {
             zkc.getChildren(path, this, (rc, path1, ctx, children) -> {
                 execute(() -> {
@@ -163,6 +176,11 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                             future.completeExceptionally(ex);
                             return null;
                         });
+                    } else if (code == Code.CONNECTIONLOSS) {
+                        // There is the chance that we caused a connection reset by sending or requesting a batch
+                        // that passed the max ZK limit. Retry with the individual operations
+                        executor.schedule(() -> getChildrenFromStoreInternal(path, future),
+                            100, TimeUnit.MILLISECONDS);
                     } else {
                         future.completeExceptionally(getException(code, path));
                     }
@@ -171,14 +189,16 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
         } catch (Throwable t) {
             future.completeExceptionally(new MetadataStoreException(t));
         }
-
-        return future;
     }
 
     @Override
     public CompletableFuture<Boolean> existsFromStore(String path) {
         CompletableFuture<Boolean> future = new CompletableFuture<>();
+        existsFromStoreInternal(path, future);
+        return future;
+    }
 
+    private void existsFromStoreInternal(String path, CompletableFuture<Boolean> future) {
         try {
             zkc.exists(path, this, (rc, path1, ctx, stat) -> {
                 execute(() -> {
@@ -187,6 +207,12 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                         future.complete(true);
                     } else if (code == Code.NONODE) {
                         future.complete(false);
+                    } else if (code == Code.CONNECTIONLOSS) {
+                        // There is the chance that we caused a connection reset by sending or requesting a batch
+                        // that passed the max ZK limit. Retry with the individual operations
+                        log.warn("Zookeeper connection loss, existsFromStore {}, retry after 100ms", path);
+                        executor.schedule(() -> existsFromStoreInternal(path, future),
+                            100, TimeUnit.MILLISECONDS);
                     } else {
                         future.completeExceptionally(getException(code, path));
                     }
@@ -195,8 +221,6 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
         } catch (Throwable t) {
             future.completeExceptionally(new MetadataStoreException(t));
         }
-
-        return future;
     }
 
     @Override
@@ -206,12 +230,17 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
 
     @Override
     public CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion,
-            EnumSet<CreateOption> options) {
+                                            EnumSet<CreateOption> options) {
+        CompletableFuture<Stat> future = new CompletableFuture<>();
+        storePutInternal(path, value, optExpectedVersion, options, future);
+        return future;
+    }
+
+    private void storePutInternal(String path, byte[] value, Optional<Long> optExpectedVersion,
+            EnumSet<CreateOption> options, CompletableFuture<Stat> future) {
         boolean hasVersion = optExpectedVersion.isPresent();
         int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
 
-        CompletableFuture<Stat> future = new CompletableFuture<>();
-
         try {
             if (hasVersion && expectedVersion == -1) {
                 CreateMode createMode = getCreateMode(options);
@@ -224,6 +253,13 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                                 } else if (code == Code.NODEEXISTS) {
                                     // We're emulating a request to create node, so the version is invalid
                                     future.completeExceptionally(getException(Code.BADVERSION, path));
+                                } else if (code == Code.CONNECTIONLOSS) {
+                                    // There is the chance that we caused a connection reset by sending or requesting a batch
+                                    // that passed the max ZK limit. Retry with the individual operations
+                                    log.warn("Zookeeper connection loss, storePut {}, retry after 100ms", path);
+                                    executor.schedule(() ->
+                                            storePutInternal(path, value, optExpectedVersion, options, future),
+                                        100, TimeUnit.MILLISECONDS);
                                 } else {
                                     future.completeExceptionally(getException(code, path));
                                 }
@@ -248,6 +284,12 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                                             return null;
                                         });
                             }
+                        } else if (code == Code.CONNECTIONLOSS) {
+                            // There is the chance that we caused a connection reset by sending or requesting a batch
+                            // that passed the max ZK limit. Retry with the individual operations
+                            log.warn("Zookeeper connection loss, storePut {}, retry after 100ms", path);
+                            executor.schedule(() -> storePutInternal(path, value, optExpectedVersion, options, future),
+                                100, TimeUnit.MILLISECONDS);
                         } else {
                             future.completeExceptionally(getException(code, path));
                         }
@@ -257,15 +299,18 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
         } catch (Throwable t) {
             future.completeExceptionally(new MetadataStoreException(t));
         }
-
-        return future;
     }
 
     @Override
     public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
-        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
-
         CompletableFuture<Void> future = new CompletableFuture<>();
+        storeDeleteInternal(path, optExpectedVersion, future);
+        return future;
+    }
+
+    private void storeDeleteInternal(String path, Optional<Long> optExpectedVersion,
+                                                       CompletableFuture<Void> future) {
+        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
 
         try {
             zkc.delete(path, expectedVersion, (rc, path1, ctx) -> {
@@ -273,6 +318,12 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(null);
+                    } else if (code == Code.CONNECTIONLOSS) {
+                        // There is the chance that we caused a connection reset by sending or requesting a batch
+                        // that passed the max ZK limit. Retry with the individual operations
+                        log.warn("Zookeeper connection loss, storeDelete {}, retry after 100ms", path);
+                        executor.schedule(() -> storeDeleteInternal(path, optExpectedVersion, future),
+                            100, TimeUnit.MILLISECONDS);
                     } else {
                         future.completeExceptionally(getException(code, path));
                     }
@@ -281,8 +332,6 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
         } catch (Throwable t) {
             future.completeExceptionally(new MetadataStoreException(t));
         }
-
-        return future;
     }
 
     @Override