You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/10/18 10:38:04 UTC

[pulsar] 02/02: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes (#17401)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e14f069cca460b2d8709b359e830ee98616fcbaf
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Sep 8 21:21:46 2022 +0300

    [fix][broker] Fix issue where leader broker information isn't available after 10 minutes (#17401)
    
    * [fix][broker] Leader election cache should not invalidate entries
    
    - cache expiration leads to problems and it is better to serve a stale entry than to ever expire the
      entry
    - MetadataStore didn't support passing cache configuration for a MetadataCache, it
      was necessary to modify the interface to support that.
    
    * Add test case
    
    * Prevent stale values in leader election cache
    
    Co-authored-by: Enrico Olivelli <eo...@apache.org>
    (cherry picked from commit 774c50b955d5be799ffbeee8c10d79f7dcb31d3b)
---
 .../pulsar/broker/MultiBrokerTestZKBaseTest.java   |  61 +++++++++++
 .../MultiBrokerLeaderElectionExpirationTest.java   | 114 +++++++++++++++++++++
 .../loadbalance/MultiBrokerLeaderElectionTest.java |  38 +------
 .../pulsar/metadata/api/MetadataCacheConfig.java   |  50 +++++++++
 .../apache/pulsar/metadata/api/MetadataStore.java  |  57 ++++++++++-
 .../metadata/cache/impl/MetadataCacheImpl.java     |  25 +++--
 .../coordination/impl/LeaderElectionImpl.java      |  28 +++--
 .../metadata/impl/AbstractMetadataStore.java       |  13 +--
 .../metadata/impl/FaultInjectionMetadataStore.java |  13 +--
 9 files changed, 327 insertions(+), 72 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
new file mode 100644
index 00000000000..e6cf86c05b5
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.TestZKServer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+/**
+ * Multiple brokers with a real test Zookeeper server (instead of the mock server)
+ */
+@Slf4j
+public abstract class MultiBrokerTestZKBaseTest extends MultiBrokerBaseTest {
+    TestZKServer testZKServer;
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        testZKServer = new TestZKServer();
+    }
+
+    @Override
+    protected void onCleanup() {
+        super.onCleanup();
+        if (testZKServer != null) {
+            try {
+                testZKServer.close();
+            } catch (Exception e) {
+                log.error("Error in stopping ZK server", e);
+            }
+        }
+    }
+
+    @Override
+    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+    }
+
+    @Override
+    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionExpirationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionExpirationTest.java
new file mode 100644
index 00000000000..65962868b87
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionExpirationTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.loadbalance;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MultiBrokerTestZKBaseTest;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class MultiBrokerLeaderElectionExpirationTest extends MultiBrokerTestZKBaseTest {
+    private static final long EXPIRE_AFTER_WRITE_MILLIS_IN_TEST = 2000L;
+    private static final long REFRESH_AFTER_WRITE_MILLIS_IN_TEST = 1000L;
+
+    @Override
+    protected int numberOfAdditionalBrokers() {
+        return 9;
+    }
+
+    @Test
+    public void shouldElectOneLeader() {
+        int leaders = 0;
+        for (PulsarService broker : getAllBrokers()) {
+            if (broker.getLeaderElectionService().isLeader()) {
+                leaders++;
+            }
+        }
+        assertEquals(leaders, 1);
+    }
+
+    @Override
+    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+        return changeDefaultMetadataCacheConfig(super.createLocalMetadataStore());
+    }
+
+    @Override
+    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+        return changeDefaultMetadataCacheConfig(super.createConfigurationMetadataStore());
+    }
+
+    MetadataStoreExtended changeDefaultMetadataCacheConfig(MetadataStoreExtended metadataStore) {
+        MetadataStoreExtended spy = spy(metadataStore);
+        when(spy.getDefaultMetadataCacheConfig()).thenReturn(MetadataCacheConfig
+                .builder()
+                .refreshAfterWriteMillis(REFRESH_AFTER_WRITE_MILLIS_IN_TEST)
+                .expireAfterWriteMillis(EXPIRE_AFTER_WRITE_MILLIS_IN_TEST)
+                .build());
+        return spy;
+    }
+
+    @Test
+    public void shouldAllBrokersBeAbleToGetTheLeaderAfterExpiration()
+            throws ExecutionException, InterruptedException, TimeoutException {
+
+        // if you want to see this test fail, modify the line in LeaderElectionImpl constructor for creating
+        // the metadata cache to not skip expirations:
+        // this.cache = store.getMetadataCache(clazz);
+
+        // Given that all brokers have the leader elected
+        Awaitility.await().untilAsserted(() -> {
+            for (PulsarService broker : getAllBrokers()) {
+                Optional<LeaderBroker> currentLeader = broker.getLeaderElectionService().getCurrentLeader();
+                assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
+            }
+        });
+
+        // Wait for metadata cache entries to expire
+        Thread.sleep(EXPIRE_AFTER_WRITE_MILLIS_IN_TEST);
+
+        // then leader should be known on all brokers and it should be the same leader
+        LeaderBroker leader = null;
+        for (PulsarService broker : getAllBrokers()) {
+            Optional<LeaderBroker> currentLeader =
+                    broker.getLeaderElectionService().readCurrentLeader().get(1, TimeUnit.SECONDS);
+            assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
+            if (leader != null) {
+                assertEquals(currentLeader.get(), leader,
+                        "Different leader on broker " + broker.getBrokerServiceUrl());
+            } else {
+                leader = currentLeader.get();
+            }
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
index 462b640c175..36ed1ca6c61 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -34,55 +34,21 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.MultiBrokerTestZKBaseTest;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.metadata.TestZKServer;
-import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker")
-public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
+public class MultiBrokerLeaderElectionTest extends MultiBrokerTestZKBaseTest {
     @Override
     protected int numberOfAdditionalBrokers() {
         return 9;
     }
 
-    TestZKServer testZKServer;
-
-    @Override
-    protected void doInitConf() throws Exception {
-        super.doInitConf();
-        testZKServer = new TestZKServer();
-    }
-
-    @Override
-    protected void onCleanup() {
-        super.onCleanup();
-        if (testZKServer != null) {
-            try {
-                testZKServer.close();
-            } catch (Exception e) {
-                log.error("Error in stopping ZK server", e);
-            }
-        }
-    }
-
-    @Override
-    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
-        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
-    }
-
-    @Override
-    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
-        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
-    }
-
     @Test
     public void shouldElectOneLeader() {
         int leaders = 0;
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
new file mode 100644
index 00000000000..1671fd39956
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.concurrent.TimeUnit;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * The configuration builder for a {@link MetadataCache} config.
+ */
+@Builder
+@Getter
+@ToString
+public class MetadataCacheConfig {
+    private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
+
+    /**
+     * Specifies that active entries are eligible for automatic refresh once a fixed duration has
+     * elapsed after the entry's creation, or the most recent replacement of its value.
+     * A negative or zero value disables automatic refresh.
+     */
+    @Builder.Default
+    private final long refreshAfterWriteMillis = DEFAULT_CACHE_REFRESH_TIME_MILLIS;
+
+    /**
+     * Specifies that each entry should be automatically removed from the cache once a fixed duration
+     * has elapsed after the entry's creation, or the most recent replacement of its value.
+     * A negative or zero value disables automatic expiration.
+     */
+    @Builder.Default
+    private final long expireAfterWriteMillis = 2 * DEFAULT_CACHE_REFRESH_TIME_MILLIS;
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index fa89d7e3dd2..b4295f25867 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -136,9 +136,23 @@ public interface MetadataStore extends AutoCloseable {
      * @param <T>
      * @param clazz
      *            the class type to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+    <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param clazz
+     *            the class type to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+        return getMetadataCache(clazz, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache specialized for a specific class.
@@ -146,9 +160,23 @@ public interface MetadataStore extends AutoCloseable {
      * @param <T>
      * @param typeRef
      *            the type ref description to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
+    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param typeRef
+     *            the type ref description to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+        return getMetadataCache(typeRef, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache that uses a particular serde object.
@@ -156,7 +184,30 @@ public interface MetadataStore extends AutoCloseable {
      * @param <T>
      * @param serde
      *            the custom serialization/deserialization object
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde);
+    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache that uses a particular serde object.
+     *
+     * @param <T>
+     * @param serde
+     *            the custom serialization/deserialization object
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
+        return getMetadataCache(serde, getDefaultMetadataCacheConfig());
+    }
+
+    /**
+     * Returns the default metadata cache config.
+     *
+     * @return default metadata cache config
+     */
+    default MetadataCacheConfig getDefaultMetadataCacheConfig() {
+        return MetadataCacheConfig.builder().build();
+    }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 67fa7afd00f..2cbe9a6dc19 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.CacheGetResult;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
@@ -50,30 +51,32 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 
 @Slf4j
 public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notification> {
-
-    private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
-
     @Getter
     private final MetadataStore store;
     private final MetadataSerde<T> serde;
 
     private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> objCache;
 
-    public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef) {
-        this(store, new JSONMetadataSerdeTypeRef<>(typeRef));
+    public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
+        this(store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig);
     }
 
-    public MetadataCacheImpl(MetadataStore store, JavaType type) {
-        this(store, new JSONMetadataSerdeSimpleType<>(type));
+    public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig cacheConfig) {
+        this(store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig);
     }
 
-    public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde) {
+    public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
         this.store = store;
         this.serde = serde;
 
-        this.objCache = Caffeine.newBuilder()
-                .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS)
-                .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS)
+        Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
+        if (cacheConfig.getRefreshAfterWriteMillis() > 0) {
+            cacheBuilder.refreshAfterWrite(cacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);
+        }
+        if (cacheConfig.getExpireAfterWriteMillis() > 0) {
+            cacheBuilder.expireAfterWrite(cacheConfig.getExpireAfterWriteMillis(), TimeUnit.MILLISECONDS);
+        }
+        this.objCache = cacheBuilder
                 .buildAsync(new AsyncCacheLoader<String, Optional<CacheGetResult<T>>>() {
                     @Override
                     public CompletableFuture<Optional<CacheGetResult<T>>> asyncLoad(String key, Executor executor) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index af873201e96..86fac33f7c2 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
@@ -32,6 +33,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException;
@@ -52,6 +54,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
     private final MetadataStoreExtended store;
     private final MetadataCache<T> cache;
     private final Consumer<LeaderElectionState> stateChangesListener;
+    private final ScheduledFuture<?> updateCachedValueFuture;
 
     private LeaderElectionState leaderElectionState;
     private Optional<Long> version = Optional.empty();
@@ -73,7 +76,10 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
         this.path = path;
         this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
         this.store = store;
-        this.cache = store.getMetadataCache(clazz);
+        MetadataCacheConfig metadataCacheConfig = MetadataCacheConfig.builder()
+                .expireAfterWriteMillis(-1L)
+                .build();
+        this.cache = store.getMetadataCache(clazz, metadataCacheConfig);
         this.leaderElectionState = LeaderElectionState.NoLeader;
         this.internalState = InternalState.Init;
         this.stateChangesListener = stateChangesListener;
@@ -81,6 +87,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
 
         store.registerListener(this::handlePathNotification);
         store.registerSessionListener(this::handleSessionNotification);
+        updateCachedValueFuture = executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
+                metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
+                metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -102,10 +111,13 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
             } else {
                 return tryToBecomeLeader();
             }
-        }).thenCompose(leaderElectionState ->
-                // make sure that the cache contains the current leader
-                // so that getLeaderValueIfPresent works on all brokers
-                cache.get(path).thenApply(__ -> leaderElectionState));
+        }).thenComposeAsync(leaderElectionState -> {
+            // make sure that the cache contains the current leader
+            // so that getLeaderValueIfPresent works on all brokers
+            cache.refresh(path);
+            return cache.get(path)
+                    .thenApply(__ -> leaderElectionState);
+        }, executor);
     }
 
     private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) {
@@ -207,11 +219,6 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
                         // There was a conflict between 2 participants trying to become leaders at same time. Retry
                         // to fetch info on new leader.
 
-                        // We force the invalidation of the cache entry. Since we received a BadVersion error, we
-                        // already know that the entry is out of date. If we don't invalidate, we'd be retrying the
-                        // leader election many times until we finally receive the notification that invalidates the
-                        // cache.
-                        cache.invalidate(path);
                         elect()
                             .thenAccept(lse -> result.complete(lse))
                             .exceptionally(ex2 -> {
@@ -229,6 +236,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
 
     @Override
     public void close() throws Exception {
+        updateCachedValueFuture.cancel(true);
         try {
             asyncClose().join();
         } catch (CompletionException e) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 8986811ad9f..4b9ff914fcf 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -47,6 +47,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataEvent;
 import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -212,23 +213,23 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
     }
 
     @Override
-    public <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+    public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) {
         MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this,
-                TypeFactory.defaultInstance().constructSimpleType(clazz, null));
+                TypeFactory.defaultInstance().constructSimpleType(clazz, null), cacheConfig);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
 
     @Override
-    public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
-        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, typeRef);
+    public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
+        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, typeRef, cacheConfig);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
 
     @Override
-    public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
-        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(this, serde);
+    public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
+        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(this, serde, cacheConfig);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 679b49a740b..4972570654c 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -31,6 +31,7 @@ import lombok.Data;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
@@ -146,18 +147,18 @@ public class FaultInjectionMetadataStore implements MetadataStoreExtended {
     }
 
     @Override
-    public <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
-        return store.getMetadataCache(clazz);
+    public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) {
+        return store.getMetadataCache(clazz, cacheConfig);
     }
 
     @Override
-    public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
-        return store.getMetadataCache(typeRef);
+    public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
+        return store.getMetadataCache(typeRef, cacheConfig);
     }
 
     @Override
-    public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
-        return store.getMetadataCache(serde);
+    public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
+        return store.getMetadataCache(serde, cacheConfig);
     }
 
     @Override