You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/08/31 18:59:03 UTC
[pulsar] 01/02: [improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths (#18518)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 29bb685c2c31654a2f5aec91316ebc3097890e5b
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Dec 27 11:55:17 2022 +0100
[improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths (#18518)
(cherry picked from commit 492a9c3e44bef2334a77164afc8b033cc8f8d82f)
---
.../apache/pulsar/broker/resources/BaseResources.java | 7 +++++++
.../pulsar/broker/resources/NamespaceResources.java | 14 ++++++++++++--
.../org/apache/pulsar/metadata/api/MetadataStore.java | 11 +++++++++++
.../apache/pulsar/metadata/impl/ZKMetadataStore.java | 18 ++++++++++++++++++
4 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index bc670c53a8c..6f3fb7d714a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -92,6 +92,13 @@ public class BaseResources<T> {
return cache.get(path);
}
+ protected CompletableFuture<Optional<T>> refreshAndGetAsync(String path) {
+ return store.sync(path).thenCompose(___ -> {
+ cache.invalidate(path);
+ return cache.get(path);
+ });
+ }
+
protected void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
try {
setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 90e3971c4cf..5e59d20bfbf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -237,8 +237,18 @@ public class NamespaceResources extends BaseResources<Policies> {
}
public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn) {
- return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
- tn.getEncodedLocalName()));
+ return getPartitionedTopicMetadataAsync(tn, false);
+ }
+
+ public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn,
+ boolean refresh) {
+ if (refresh) {
+ return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
+ tn.getEncodedLocalName()));
+ } else {
+ return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
+ tn.getEncodedLocalName()));
+ }
}
public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index b4295f25867..79670e859d6 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable {
*/
CompletableFuture<Optional<GetResult>> get(String path);
+
+ /**
+ * Ensure that the next value read from the local client will be up-to-date with the latest version of the value
+ * as it can be seen by all the other clients.
+ * @param path
+ * @return a handle to the operation
+ */
+ default CompletableFuture<Void> sync(String path) {
+ return CompletableFuture.completedFuture(null);
+ }
+
/**
* Return all the nodes (lexicographically sorted) that are children to the specific path.
*
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 6f980d3e794..33e708a76b9 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -153,6 +153,24 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
}
}
+ @Override
+ public CompletableFuture<Void> sync(String path) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ zkc.sync(path, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String s, Object o) {
+ Code code = Code.get(rc);
+ if (code == Code.OK) {
+ result.complete(null);
+ } else {
+ MetadataStoreException e = getException(code, path);
+ result.completeExceptionally(e);
+ }
+ }
+ }, null);
+ return result;
+ }
+
@Override
protected void batchOperation(List<MetadataOp> ops) {
try {