You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/23 15:34:57 UTC
[pulsar] branch branch-2.8 updated: [improve][broker]Add retry for zk operation on connection loss (#17226)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new a9f3664ecc9 [improve][broker]Add retry for zk operation on connection loss (#17226)
a9f3664ecc9 is described below
commit a9f3664ecc95374474362507bb8927271c23dfe9
Author: Hang Chen <ch...@apache.org>
AuthorDate: Tue Aug 23 23:34:48 2022 +0800
[improve][broker]Add retry for zk operation on connection loss (#17226)
---
.../metadata/impl/AbstractMetadataStore.java | 6 +-
.../pulsar/metadata/impl/ZKMetadataStore.java | 81 +++++++++++++++++-----
2 files changed, 68 insertions(+), 19 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index a0ba33b3bee..55138a96f80 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -33,9 +33,9 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -59,7 +59,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
- private final ExecutorService executor;
+ protected final ScheduledExecutorService executor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
private final AsyncLoadingCache<String, Boolean> existsCache;
private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
@@ -70,7 +70,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
protected AbstractMetadataStore() {
this.executor = Executors
- .newSingleThreadExecutor(new DefaultThreadFactory("metadata-store"));
+ .newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store"));
registerListener(this);
this.childrenCache = Caffeine.newBuilder()
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index e1e44f816c6..49d21bf46d3 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.ZkUtils;
@@ -94,7 +95,11 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
@Override
public CompletableFuture<Optional<GetResult>> get(String path) {
CompletableFuture<Optional<GetResult>> future = new CompletableFuture<>();
+ getInternal(path, future);
+ return future;
+ }
+ private void getInternal(String path, CompletableFuture<Optional<GetResult>> future) {
try {
zkc.getData(path, this, (rc, path1, ctx, data, stat) -> {
execute(() -> {
@@ -120,7 +125,13 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
return null;
});
future.complete(Optional.empty());
- } else {
+ } else if (code == Code.CONNECTIONLOSS) {
+ // There is the chance that we caused a connection reset by sending or requesting a batch
+ // that passed the max ZK limit. Retry with the individual operations
+ log.warn("Zookeeper connection loss, get {}, retry after 100ms", path);
+ executor.schedule(() -> getInternal(path, future),
+ 100, TimeUnit.MILLISECONDS);
+ } else {
future.completeExceptionally(getException(code, path));
}
}, future);
@@ -128,14 +139,16 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
}
-
- return future;
}
@Override
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
CompletableFuture<List<String>> future = new CompletableFuture<>();
+ getChildrenFromStoreInternal(path, future);
+ return future;
+ }
+ private void getChildrenFromStoreInternal(String path, CompletableFuture<List<String>> future) {
try {
zkc.getChildren(path, this, (rc, path1, ctx, children) -> {
execute(() -> {
@@ -163,6 +176,11 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
future.completeExceptionally(ex);
return null;
});
+ } else if (code == Code.CONNECTIONLOSS) {
+ // There is the chance that we caused a connection reset by sending or requesting a batch
+ // that passed the max ZK limit. Retry with the individual operations
+ executor.schedule(() -> getChildrenFromStoreInternal(path, future),
+ 100, TimeUnit.MILLISECONDS);
} else {
future.completeExceptionally(getException(code, path));
}
@@ -171,14 +189,16 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
}
-
- return future;
}
@Override
public CompletableFuture<Boolean> existsFromStore(String path) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
+ existsFromStoreInternal(path, future);
+ return future;
+ }
+ private void existsFromStoreInternal(String path, CompletableFuture<Boolean> future) {
try {
zkc.exists(path, this, (rc, path1, ctx, stat) -> {
execute(() -> {
@@ -187,6 +207,12 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
future.complete(true);
} else if (code == Code.NONODE) {
future.complete(false);
+ } else if (code == Code.CONNECTIONLOSS) {
+ // There is the chance that we caused a connection reset by sending or requesting a batch
+ // that passed the max ZK limit. Retry with the individual operations
+ log.warn("Zookeeper connection loss, existsFromStore {}, retry after 100ms", path);
+ executor.schedule(() -> existsFromStoreInternal(path, future),
+ 100, TimeUnit.MILLISECONDS);
} else {
future.completeExceptionally(getException(code, path));
}
@@ -195,8 +221,6 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
}
-
- return future;
}
@Override
@@ -206,12 +230,17 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
@Override
public CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion,
- EnumSet<CreateOption> options) {
+ EnumSet<CreateOption> options) {
+ CompletableFuture<Stat> future = new CompletableFuture<>();
+ storePutInternal(path, value, optExpectedVersion, options, future);
+ return future;
+ }
+
+ private void storePutInternal(String path, byte[] value, Optional<Long> optExpectedVersion,
+ EnumSet<CreateOption> options, CompletableFuture<Stat> future) {
boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
- CompletableFuture<Stat> future = new CompletableFuture<>();
-
try {
if (hasVersion && expectedVersion == -1) {
CreateMode createMode = getCreateMode(options);
@@ -224,6 +253,13 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} else if (code == Code.NODEEXISTS) {
// We're emulating a request to create node, so the version is invalid
future.completeExceptionally(getException(Code.BADVERSION, path));
+ } else if (code == Code.CONNECTIONLOSS) {
+ // There is the chance that we caused a connection reset by sending or requesting a batch
+ // that passed the max ZK limit. Retry with the individual operations
+ log.warn("Zookeeper connection loss, storePut {}, retry after 100ms", path);
+ executor.schedule(() ->
+ storePutInternal(path, value, optExpectedVersion, options, future),
+ 100, TimeUnit.MILLISECONDS);
} else {
future.completeExceptionally(getException(code, path));
}
@@ -248,6 +284,12 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
return null;
});
}
+ } else if (code == Code.CONNECTIONLOSS) {
+ // There is the chance that we caused a connection reset by sending or requesting a batch
+ // that passed the max ZK limit. Retry with the individual operations
+ log.warn("Zookeeper connection loss, storePut {}, retry after 100ms", path);
+ executor.schedule(() -> storePutInternal(path, value, optExpectedVersion, options, future),
+ 100, TimeUnit.MILLISECONDS);
} else {
future.completeExceptionally(getException(code, path));
}
@@ -257,15 +299,18 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
}
-
- return future;
}
@Override
public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
- int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
-
CompletableFuture<Void> future = new CompletableFuture<>();
+ storeDeleteInternal(path, optExpectedVersion, future);
+ return future;
+ }
+
+ private void storeDeleteInternal(String path, Optional<Long> optExpectedVersion,
+ CompletableFuture<Void> future) {
+ int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
try {
zkc.delete(path, expectedVersion, (rc, path1, ctx) -> {
@@ -273,6 +318,12 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(null);
+ } else if (code == Code.CONNECTIONLOSS) {
+ // There is the chance that we caused a connection reset by sending or requesting a batch
+ // that passed the max ZK limit. Retry with the individual operations
+ log.warn("Zookeeper connection loss, storeDelete {}, retry after 100ms", path);
+ executor.schedule(() -> storeDeleteInternal(path, optExpectedVersion, future),
+ 100, TimeUnit.MILLISECONDS);
} else {
future.completeExceptionally(getException(code, path));
}
@@ -281,8 +332,6 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
}
-
- return future;
}
@Override