You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/08/31 18:59:03 UTC

[pulsar] 01/02: [improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths (#18518)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 29bb685c2c31654a2f5aec91316ebc3097890e5b
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Dec 27 11:55:17 2022 +0100

    [improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths (#18518)
    
    (cherry picked from commit 492a9c3e44bef2334a77164afc8b033cc8f8d82f)
---
 .../apache/pulsar/broker/resources/BaseResources.java  |  7 +++++++
 .../pulsar/broker/resources/NamespaceResources.java    | 14 ++++++++++++--
 .../org/apache/pulsar/metadata/api/MetadataStore.java  | 11 +++++++++++
 .../apache/pulsar/metadata/impl/ZKMetadataStore.java   | 18 ++++++++++++++++++
 4 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index bc670c53a8c..6f3fb7d714a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -92,6 +92,13 @@ public class BaseResources<T> {
         return cache.get(path);
     }
 
+    protected CompletableFuture<Optional<T>> refreshAndGetAsync(String path) {
+        return store.sync(path).thenCompose(___ -> {
+            cache.invalidate(path);
+            return cache.get(path);
+        });
+    }
+
     protected void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
         try {
             setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 90e3971c4cf..5e59d20bfbf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -237,8 +237,18 @@ public class NamespaceResources extends BaseResources<Policies> {
         }
 
         public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn) {
-            return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
-                    tn.getEncodedLocalName()));
+            return getPartitionedTopicMetadataAsync(tn, false);
+        }
+
+        public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn,
+                                                                                                      boolean refresh) {
+            if (refresh) {
+                return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
+                        tn.getEncodedLocalName()));
+            } else {
+                return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
+                        tn.getEncodedLocalName()));
+            }
         }
 
         public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index b4295f25867..79670e859d6 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable {
      */
     CompletableFuture<Optional<GetResult>> get(String path);
 
+
+    /**
+     * Ensure that the next value read from  the local client will be up-to-date with the latest version of the value
+     * as it can be seen by all the other clients.
+     * @param path
+     * @return a handle to the operation
+     */
+    default CompletableFuture<Void> sync(String path) {
+        return CompletableFuture.completedFuture(null);
+    }
+
     /**
      * Return all the nodes (lexicographically sorted) that are children to the specific path.
      *
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 6f980d3e794..33e708a76b9 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -153,6 +153,24 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
         }
     }
 
+    @Override
+    public CompletableFuture<Void> sync(String path) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        zkc.sync(path, new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int rc, String s, Object o) {
+                Code code = Code.get(rc);
+                if (code == Code.OK) {
+                    result.complete(null);
+                } else {
+                    MetadataStoreException e = getException(code, path);
+                    result.completeExceptionally(e);
+                }
+            }
+        }, null);
+        return result;
+    }
+
     @Override
     protected void batchOperation(List<MetadataOp> ops) {
         try {