You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/10/18 12:02:05 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes (#17401)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new e13f089d04f [fix][broker] Fix issue where leader broker information isn't available after 10 minutes (#17401)
e13f089d04f is described below
commit e13f089d04f839d50bc40eccd8306f0df3816c73
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Sep 8 21:21:46 2022 +0300
[fix][broker] Fix issue where leader broker information isn't available after 10 minutes (#17401)
* [fix][broker] Leader election cache should not invalidate entries
- cache expiration leads to problems and it is better to serve a stale entry than to ever expire the
entry
- MetadataStore didn't support passing cache configuration for a MetadataCache, it
was necessary to modify the interface to support that.
* Add test case
* Prevent stale values in leader election cache
Co-authored-by: Enrico Olivelli <eo...@apache.org>
(cherry picked from commit 774c50b955d5be799ffbeee8c10d79f7dcb31d3b)
---
.../pulsar/broker/MultiBrokerTestZKBaseTest.java | 61 +++++++++++
.../MultiBrokerLeaderElectionExpirationTest.java | 114 +++++++++++++++++++++
.../loadbalance/MultiBrokerLeaderElectionTest.java | 38 +------
.../pulsar/metadata/api/MetadataCacheConfig.java | 50 +++++++++
.../apache/pulsar/metadata/api/MetadataStore.java | 57 ++++++++++-
.../metadata/cache/impl/MetadataCacheImpl.java | 25 +++--
.../coordination/impl/LeaderElectionImpl.java | 28 +++--
.../metadata/impl/AbstractMetadataStore.java | 13 +--
.../metadata/impl/FaultInjectionMetadataStore.java | 13 +--
9 files changed, 327 insertions(+), 72 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
new file mode 100644
index 00000000000..e6cf86c05b5
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.TestZKServer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+/**
+ * Multiple brokers with a real test Zookeeper server (instead of the mock server)
+ */
+@Slf4j
+public abstract class MultiBrokerTestZKBaseTest extends MultiBrokerBaseTest {
+ TestZKServer testZKServer;
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ testZKServer = new TestZKServer();
+ }
+
+ @Override
+ protected void onCleanup() {
+ super.onCleanup();
+ if (testZKServer != null) {
+ try {
+ testZKServer.close();
+ } catch (Exception e) {
+ log.error("Error in stopping ZK server", e);
+ }
+ }
+ }
+
+ @Override
+ protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+ return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+ }
+
+ @Override
+ protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+ return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionExpirationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionExpirationTest.java
new file mode 100644
index 00000000000..65962868b87
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionExpirationTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.loadbalance;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MultiBrokerTestZKBaseTest;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class MultiBrokerLeaderElectionExpirationTest extends MultiBrokerTestZKBaseTest {
+ private static final long EXPIRE_AFTER_WRITE_MILLIS_IN_TEST = 2000L;
+ private static final long REFRESH_AFTER_WRITE_MILLIS_IN_TEST = 1000L;
+
+ @Override
+ protected int numberOfAdditionalBrokers() {
+ return 9;
+ }
+
+ @Test
+ public void shouldElectOneLeader() {
+ int leaders = 0;
+ for (PulsarService broker : getAllBrokers()) {
+ if (broker.getLeaderElectionService().isLeader()) {
+ leaders++;
+ }
+ }
+ assertEquals(leaders, 1);
+ }
+
+ @Override
+ protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+ return changeDefaultMetadataCacheConfig(super.createLocalMetadataStore());
+ }
+
+ @Override
+ protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+ return changeDefaultMetadataCacheConfig(super.createConfigurationMetadataStore());
+ }
+
+ MetadataStoreExtended changeDefaultMetadataCacheConfig(MetadataStoreExtended metadataStore) {
+ MetadataStoreExtended spy = spy(metadataStore);
+ when(spy.getDefaultMetadataCacheConfig()).thenReturn(MetadataCacheConfig
+ .builder()
+ .refreshAfterWriteMillis(REFRESH_AFTER_WRITE_MILLIS_IN_TEST)
+ .expireAfterWriteMillis(EXPIRE_AFTER_WRITE_MILLIS_IN_TEST)
+ .build());
+ return spy;
+ }
+
+ @Test
+ public void shouldAllBrokersBeAbleToGetTheLeaderAfterExpiration()
+ throws ExecutionException, InterruptedException, TimeoutException {
+
+ // if you want to see this test fail, modify the line in LeaderElectionImpl constructor for creating
+ // the metadata cache to not skip expirations:
+ // this.cache = store.getMetadataCache(clazz);
+
+ // Given that all brokers have the leader elected
+ Awaitility.await().untilAsserted(() -> {
+ for (PulsarService broker : getAllBrokers()) {
+ Optional<LeaderBroker> currentLeader = broker.getLeaderElectionService().getCurrentLeader();
+ assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
+ }
+ });
+
+ // Wait for metadata cache entries to expire
+ Thread.sleep(EXPIRE_AFTER_WRITE_MILLIS_IN_TEST);
+
+ // then leader should be known on all brokers and it should be the same leader
+ LeaderBroker leader = null;
+ for (PulsarService broker : getAllBrokers()) {
+ Optional<LeaderBroker> currentLeader =
+ broker.getLeaderElectionService().readCurrentLeader().get(1, TimeUnit.SECONDS);
+ assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
+ if (leader != null) {
+ assertEquals(currentLeader.get(), leader,
+ "Different leader on broker " + broker.getBrokerServiceUrl());
+ } else {
+ leader = currentLeader.get();
+ }
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
index 462b640c175..36ed1ca6c61 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -34,55 +34,21 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.MultiBrokerTestZKBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.metadata.TestZKServer;
-import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
-public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
+public class MultiBrokerLeaderElectionTest extends MultiBrokerTestZKBaseTest {
@Override
protected int numberOfAdditionalBrokers() {
return 9;
}
- TestZKServer testZKServer;
-
- @Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
- testZKServer = new TestZKServer();
- }
-
- @Override
- protected void onCleanup() {
- super.onCleanup();
- if (testZKServer != null) {
- try {
- testZKServer.close();
- } catch (Exception e) {
- log.error("Error in stopping ZK server", e);
- }
- }
- }
-
- @Override
- protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
- return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
- }
-
- @Override
- protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
- return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
- }
-
@Test
public void shouldElectOneLeader() {
int leaders = 0;
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
new file mode 100644
index 00000000000..1671fd39956
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.concurrent.TimeUnit;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * The configuration builder for a {@link MetadataCache} config.
+ */
+@Builder
+@Getter
+@ToString
+public class MetadataCacheConfig {
+ private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
+
+ /**
+ * Specifies that active entries are eligible for automatic refresh once a fixed duration has
+ * elapsed after the entry's creation, or the most recent replacement of its value.
+ * A negative or zero value disables automatic refresh.
+ */
+ @Builder.Default
+ private final long refreshAfterWriteMillis = DEFAULT_CACHE_REFRESH_TIME_MILLIS;
+
+ /**
+ * Specifies that each entry should be automatically removed from the cache once a fixed duration
+ * has elapsed after the entry's creation, or the most recent replacement of its value.
+ * A negative or zero value disables automatic expiration.
+ */
+ @Builder.Default
+ private final long expireAfterWriteMillis = 2 * DEFAULT_CACHE_REFRESH_TIME_MILLIS;
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index fa89d7e3dd2..b4295f25867 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -136,9 +136,23 @@ public interface MetadataStore extends AutoCloseable {
* @param <T>
* @param clazz
* the class type to be used for serialization/deserialization
+ * @param cacheConfig
+ * the cache configuration to be used
* @return the metadata cache object
*/
- <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+ <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig);
+
+ /**
+ * Create a metadata cache specialized for a specific class.
+ *
+ * @param <T>
+ * @param clazz
+ * the class type to be used for serialization/deserialization
+ * @return the metadata cache object
+ */
+ default <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+ return getMetadataCache(clazz, getDefaultMetadataCacheConfig());
+ }
/**
* Create a metadata cache specialized for a specific class.
@@ -146,9 +160,23 @@ public interface MetadataStore extends AutoCloseable {
* @param <T>
* @param typeRef
* the type ref description to be used for serialization/deserialization
+ * @param cacheConfig
+ * the cache configuration to be used
* @return the metadata cache object
*/
- <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
+ <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig);
+
+ /**
+ * Create a metadata cache specialized for a specific class.
+ *
+ * @param <T>
+ * @param typeRef
+ * the type ref description to be used for serialization/deserialization
+ * @return the metadata cache object
+ */
+ default <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+ return getMetadataCache(typeRef, getDefaultMetadataCacheConfig());
+ }
/**
* Create a metadata cache that uses a particular serde object.
@@ -156,7 +184,30 @@ public interface MetadataStore extends AutoCloseable {
* @param <T>
* @param serde
* the custom serialization/deserialization object
+ * @param cacheConfig
+ * the cache configuration to be used
* @return the metadata cache object
*/
- <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde);
+ <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig);
+
+ /**
+ * Create a metadata cache that uses a particular serde object.
+ *
+ * @param <T>
+ * @param serde
+ * the custom serialization/deserialization object
+ * @return the metadata cache object
+ */
+ default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
+ return getMetadataCache(serde, getDefaultMetadataCacheConfig());
+ }
+
+ /**
+ * Returns the default metadata cache config.
+ *
+ * @return default metadata cache config
+ */
+ default MetadataCacheConfig getDefaultMetadataCacheConfig() {
+ return MetadataCacheConfig.builder().build();
+ }
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 67fa7afd00f..2cbe9a6dc19 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
@@ -50,30 +51,32 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@Slf4j
public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notification> {
-
- private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
-
@Getter
private final MetadataStore store;
private final MetadataSerde<T> serde;
private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> objCache;
- public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef) {
- this(store, new JSONMetadataSerdeTypeRef<>(typeRef));
+ public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
+ this(store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig);
}
- public MetadataCacheImpl(MetadataStore store, JavaType type) {
- this(store, new JSONMetadataSerdeSimpleType<>(type));
+ public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig cacheConfig) {
+ this(store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig);
}
- public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde) {
+ public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
this.store = store;
this.serde = serde;
- this.objCache = Caffeine.newBuilder()
- .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS)
- .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS)
+ Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
+ if (cacheConfig.getRefreshAfterWriteMillis() > 0) {
+ cacheBuilder.refreshAfterWrite(cacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);
+ }
+ if (cacheConfig.getExpireAfterWriteMillis() > 0) {
+ cacheBuilder.expireAfterWrite(cacheConfig.getExpireAfterWriteMillis(), TimeUnit.MILLISECONDS);
+ }
+ this.objCache = cacheBuilder
.buildAsync(new AsyncCacheLoader<String, Optional<CacheGetResult<T>>>() {
@Override
public CompletableFuture<Optional<CacheGetResult<T>>> asyncLoad(String key, Executor executor) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index af873201e96..86fac33f7c2 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
@@ -32,6 +33,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException;
@@ -52,6 +54,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
private final MetadataStoreExtended store;
private final MetadataCache<T> cache;
private final Consumer<LeaderElectionState> stateChangesListener;
+ private final ScheduledFuture<?> updateCachedValueFuture;
private LeaderElectionState leaderElectionState;
private Optional<Long> version = Optional.empty();
@@ -73,7 +76,10 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
this.path = path;
this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
this.store = store;
- this.cache = store.getMetadataCache(clazz);
+ MetadataCacheConfig metadataCacheConfig = MetadataCacheConfig.builder()
+ .expireAfterWriteMillis(-1L)
+ .build();
+ this.cache = store.getMetadataCache(clazz, metadataCacheConfig);
this.leaderElectionState = LeaderElectionState.NoLeader;
this.internalState = InternalState.Init;
this.stateChangesListener = stateChangesListener;
@@ -81,6 +87,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
store.registerListener(this::handlePathNotification);
store.registerSessionListener(this::handleSessionNotification);
+ updateCachedValueFuture = executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
+ metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
+ metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);
}
@Override
@@ -102,10 +111,13 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
} else {
return tryToBecomeLeader();
}
- }).thenCompose(leaderElectionState ->
- // make sure that the cache contains the current leader
- // so that getLeaderValueIfPresent works on all brokers
- cache.get(path).thenApply(__ -> leaderElectionState));
+ }).thenComposeAsync(leaderElectionState -> {
+ // make sure that the cache contains the current leader
+ // so that getLeaderValueIfPresent works on all brokers
+ cache.refresh(path);
+ return cache.get(path)
+ .thenApply(__ -> leaderElectionState);
+ }, executor);
}
private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) {
@@ -207,11 +219,6 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
// There was a conflict between 2 participants trying to become leaders at same time. Retry
// to fetch info on new leader.
- // We force the invalidation of the cache entry. Since we received a BadVersion error, we
- // already know that the entry is out of date. If we don't invalidate, we'd be retrying the
- // leader election many times until we finally receive the notification that invalidates the
- // cache.
- cache.invalidate(path);
elect()
.thenAccept(lse -> result.complete(lse))
.exceptionally(ex2 -> {
@@ -229,6 +236,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
@Override
public void close() throws Exception {
+ updateCachedValueFuture.cancel(true);
try {
asyncClose().join();
} catch (CompletionException e) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index a116ea9c294..db9b05a8b14 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
@@ -123,23 +124,23 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
}
@Override
- public <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+ public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) {
MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this,
- TypeFactory.defaultInstance().constructSimpleType(clazz, null));
+ TypeFactory.defaultInstance().constructSimpleType(clazz, null), cacheConfig);
metadataCaches.add(metadataCache);
return metadataCache;
}
@Override
- public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
- MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, typeRef);
+ public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
+ MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, typeRef, cacheConfig);
metadataCaches.add(metadataCache);
return metadataCache;
}
@Override
- public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
- MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(this, serde);
+ public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
+ MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(this, serde, cacheConfig);
metadataCaches.add(metadataCache);
return metadataCache;
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 679b49a740b..4972570654c 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -31,6 +31,7 @@ import lombok.Data;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
@@ -146,18 +147,18 @@ public class FaultInjectionMetadataStore implements MetadataStoreExtended {
}
@Override
- public <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
- return store.getMetadataCache(clazz);
+ public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) {
+ return store.getMetadataCache(clazz, cacheConfig);
}
@Override
- public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
- return store.getMetadataCache(typeRef);
+ public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
+ return store.getMetadataCache(typeRef, cacheConfig);
}
@Override
- public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
- return store.getMetadataCache(serde);
+ public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
+ return store.getMetadataCache(serde, cacheConfig);
}
@Override