You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/01 12:47:58 UTC

[GitHub] [pulsar] lhotari opened a new pull request, #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

lhotari opened a new pull request, #17401:
URL: https://github.com/apache/pulsar/pull/17401

   ### Motivation
   
   - Fixes issue where leader broker information isn't available and this warning gets logged:
     `WARN  org.apache.pulsar.broker.namespace.NamespaceService - The information about the current leader broker wasn't available. Handling load manager decisions in a decentralized way.`
   
   - Possibly fixing:
     - Issue #13584 is a result of brokers making decentralized decisions without the leader broker
     - Issues where the symptoms are topic redirection issues with admin api, example #13946
   
   The root cause of the problem is that currently leader election cache entries expire in 10 minutes. Entries are refreshed when there's a cache hit between 5 to 10 minutes after the entry was added to the cache or refreshed the last time. This expiration behavior cannot be configured currently in MetadataCache. 
   
   
   ### Modifications
   
   Since leader election cache expiration leads to problems, it is better to disable expiration completely for leader election entries. The cache entry refreshing is not changed so a missed Zookeeper change notification won't cause the cache to serve stale values.  
   
   - change `MetadataStore`
     - add support passing cache configuration for a `MetadataCache` so that expiration can be disabled for leader election cache
   - revisit expiration logic in LeaderElectionImpl. Use `cache.refresh` instead of removing entries from the cache.
   
   ### Additional context
   
   - The value for the leader broker is currently read intentionally in a non-blocking way. There's an alternative fix #17254 which retrieves the value also in this case. That approach is abandoned since the correct solution is to not expire the Leader election cache at all. Thank you, @michaeljmarshall, for recommending this approach where Leader election cache isn't expired at all.
   
   - [x] `doc-not-needed` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r960670313


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java:
##########
@@ -136,27 +136,78 @@ public interface MetadataStore extends AutoCloseable {
      * @param <T>
      * @param clazz
      *            the class type to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+    <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param clazz
+     *            the class type to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+        return getMetadataCache(clazz, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache specialized for a specific class.
      *
      * @param <T>
      * @param typeRef
      *            the type ref description to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
+    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param typeRef
+     *            the type ref description to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+        return getMetadataCache(typeRef, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache that uses a particular serde object.
      *
      * @param <T>
      * @param serde
      *            the custom serialization/deserialization object
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde);
+    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache that uses a particular serde object.
+     *
+     * @param <T>
+     * @param serde
+     *            the custom serialization/deserialization object
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
+        return getMetadataCache(serde, getDefaultMetadataCacheConfig());
+    }
+
+    /**
+     * Returns the default metadata cache config.
+     *
+     * @return default metadata cache config
+     */
+    default MetadataCacheConfig getDefaultMetadataCacheConfig() {
+        return MetadataCacheConfig.builder().build();

Review Comment:
   it's better to create a constant object



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r966609766


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java:
##########
@@ -73,14 +76,20 @@ private enum InternalState {
         this.path = path;
         this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
         this.store = store;
-        this.cache = store.getMetadataCache(clazz);
+        MetadataCacheConfig metadataCacheConfig = MetadataCacheConfig.builder()
+                .expireAfterWriteMillis(-1L)
+                .build();
+        this.cache = store.getMetadataCache(clazz, metadataCacheConfig);
         this.leaderElectionState = LeaderElectionState.NoLeader;
         this.internalState = InternalState.Init;
         this.stateChangesListener = stateChangesListener;
         this.executor = executor;
 
         store.registerListener(this::handlePathNotification);
         store.registerSessionListener(this::handleSessionNotification);
+        updateCachedValueFuture = executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
+                metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
+                metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);

Review Comment:
   yes, since with refreshAfter, the first call gets a stale value. This prevents that. I made this changed based on the Pulsar community meeting discussion where Matteo was explaining the reasons for the previous expiration logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r966615232


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java:
##########
@@ -207,11 +219,6 @@ private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader()
                         // There was a conflict between 2 participants trying to become leaders at same time. Retry
                         // to fetch info on new leader.
 
-                        // We force the invalidation of the cache entry. Since we received a BadVersion error, we
-                        // already know that the entry is out of date. If we don't invalidate, we'd be retrying the
-                        // leader election many times until we finally receive the notification that invalidates the
-                        // cache.
-                        cache.invalidate(path);

Review Comment:
   The `elect()` method only calls `cache.refresh(path);` after a callback from this method (`handleExistingLeaderValue()`) is completed, and if we don't invalidate the cache, I think we'll recursively call `handleExistingLeaderValue()` as the comment indicates because `store.get(path)` will return a non empty result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#issuecomment-1244817288

   Hello @lhotari 
   It looks like we got many conflicts when cherry-picking it to branch-2.9.
   Would you mind helping cherry-pick to branch-2.9? (To avoid cherry-picking involving bugs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari merged pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes

Posted by GitBox <gi...@apache.org>.
lhotari merged PR #17401:
URL: https://github.com/apache/pulsar/pull/17401


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r961591322


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java:
##########
@@ -136,27 +136,78 @@ public interface MetadataStore extends AutoCloseable {
      * @param <T>
      * @param clazz
      *            the class type to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+    <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param clazz
+     *            the class type to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+        return getMetadataCache(clazz, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache specialized for a specific class.
      *
      * @param <T>
      * @param typeRef
      *            the type ref description to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
+    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param typeRef
+     *            the type ref description to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+        return getMetadataCache(typeRef, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache that uses a particular serde object.
      *
      * @param <T>
      * @param serde
      *            the custom serialization/deserialization object
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde);
+    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache that uses a particular serde object.
+     *
+     * @param <T>
+     * @param serde
+     *            the custom serialization/deserialization object
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
+        return getMetadataCache(serde, getDefaultMetadataCacheConfig());
+    }
+
+    /**
+     * Returns the default metadata cache config.
+     *
+     * @return default metadata cache config
+     */
+    default MetadataCacheConfig getDefaultMetadataCacheConfig() {
+        return MetadataCacheConfig.builder().build();

Review Comment:
   agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r960625291


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java:
##########
@@ -102,10 +105,13 @@ private synchronized CompletableFuture<LeaderElectionState> elect() {
             } else {
                 return tryToBecomeLeader();
             }
-        }).thenCompose(leaderElectionState ->
-                // make sure that the cache contains the current leader
-                // so that getLeaderValueIfPresent works on all brokers
-                cache.get(path).thenApply(__ -> leaderElectionState));
+        }).thenComposeAsync(leaderElectionState -> {
+            // make sure that the cache contains the current leader
+            // so that getLeaderValueIfPresent works on all brokers
+            cache.refresh(path);

Review Comment:
   is this a blocking operation ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r964188907


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java:
##########
@@ -73,14 +76,20 @@ private enum InternalState {
         this.path = path;
         this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
         this.store = store;
-        this.cache = store.getMetadataCache(clazz);
+        MetadataCacheConfig metadataCacheConfig = MetadataCacheConfig.builder()
+                .expireAfterWriteMillis(-1L)
+                .build();
+        this.cache = store.getMetadataCache(clazz, metadataCacheConfig);
         this.leaderElectionState = LeaderElectionState.NoLeader;
         this.internalState = InternalState.Init;
         this.stateChangesListener = stateChangesListener;
         this.executor = executor;
 
         store.registerListener(this::handlePathNotification);
         store.registerSessionListener(this::handleSessionNotification);
+        updateCachedValueFuture = executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
+                metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
+                metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);

Review Comment:
   A call to `getLeaderValueIfPresent` will asynchronously trigger the `refreshAfter` logic in the loading cache. Is there a reason it is insufficient to rely on those calls?



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java:
##########
@@ -207,11 +219,6 @@ private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader()
                         // There was a conflict between 2 participants trying to become leaders at same time. Retry
                         // to fetch info on new leader.
 
-                        // We force the invalidation of the cache entry. Since we received a BadVersion error, we
-                        // already know that the entry is out of date. If we don't invalidate, we'd be retrying the
-                        // leader election many times until we finally receive the notification that invalidates the
-                        // cache.
-                        cache.invalidate(path);

Review Comment:
   If we don't invalidate the value here, I think we are at risk of recursively retrying to acquire the lock just as the comment indicates because when the cached value is `None` and the actual value is another broker, this broker will keep getting `BadVersionException` without completing any of the `result` objects.
   
   This might be a place where the implementation in #17254 is better because an empty cache results in a read to the metadata store. I am wondering if its worth using both PRs in order to decrease certain edge cases during leader election where the current broker doesn't know the real leader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r960727229


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java:
##########
@@ -102,10 +105,13 @@ private synchronized CompletableFuture<LeaderElectionState> elect() {
             } else {
                 return tryToBecomeLeader();
             }
-        }).thenCompose(leaderElectionState ->
-                // make sure that the cache contains the current leader
-                // so that getLeaderValueIfPresent works on all brokers
-                cache.get(path).thenApply(__ -> leaderElectionState));
+        }).thenComposeAsync(leaderElectionState -> {
+            // make sure that the cache contains the current leader
+            // so that getLeaderValueIfPresent works on all brokers
+            cache.refresh(path);

Review Comment:
   it is. That's why I changed it to be executed within `thenComposeAsync`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r966673888


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java:
##########
@@ -207,11 +219,6 @@ private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader()
                         // There was a conflict between 2 participants trying to become leaders at same time. Retry
                         // to fetch info on new leader.
 
-                        // We force the invalidation of the cache entry. Since we received a BadVersion error, we
-                        // already know that the entry is out of date. If we don't invalidate, we'd be retrying the
-                        // leader election many times until we finally receive the notification that invalidates the
-                        // cache.
-                        cache.invalidate(path);

Review Comment:
   It's sufficient to call `cache.refresh(path)` after the election has been completed.
   The cached value is not used in any away in the leader election code path. That's why I don't think it would make any difference. `cache.get` is currently called in `tryToBecomeLeader`, but that seems unnecessary and should be removed.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r966610950


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java:
##########
@@ -207,11 +219,6 @@ private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader()
                         // There was a conflict between 2 participants trying to become leaders at same time. Retry
                         // to fetch info on new leader.
 
-                        // We force the invalidation of the cache entry. Since we received a BadVersion error, we
-                        // already know that the entry is out of date. If we don't invalidate, we'd be retrying the
-                        // leader election many times until we finally receive the notification that invalidates the
-                        // cache.
-                        cache.invalidate(path);

Review Comment:
   This is taken care of in `elect` method. It uses `store.get` and by-passes the cache. There's `cache.refresh(path);` to invalidate the stale value. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r960820391


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java:
##########
@@ -136,27 +136,78 @@ public interface MetadataStore extends AutoCloseable {
      * @param <T>
      * @param clazz
      *            the class type to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+    <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param clazz
+     *            the class type to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+        return getMetadataCache(clazz, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache specialized for a specific class.
      *
      * @param <T>
      * @param typeRef
      *            the type ref description to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
+    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param typeRef
+     *            the type ref description to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+        return getMetadataCache(typeRef, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache that uses a particular serde object.
      *
      * @param <T>
      * @param serde
      *            the custom serialization/deserialization object
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde);
+    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache that uses a particular serde object.
+     *
+     * @param <T>
+     * @param serde
+     *            the custom serialization/deserialization object
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
+        return getMetadataCache(serde, getDefaultMetadataCacheConfig());
+    }
+
+    /**
+     * Returns the default metadata cache config.
+     *
+     * @return default metadata cache config
+     */
+    default MetadataCacheConfig getDefaultMetadataCacheConfig() {
+        return MetadataCacheConfig.builder().build();

Review Comment:
   is looks to me that MetadataCacheConfig is immutable (only "final" fields)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r960820926


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java:
##########
@@ -136,27 +136,78 @@ public interface MetadataStore extends AutoCloseable {
      * @param <T>
      * @param clazz
      *            the class type to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+    <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param clazz
+     *            the class type to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+        return getMetadataCache(clazz, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache specialized for a specific class.
      *
      * @param <T>
      * @param typeRef
      *            the type ref description to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
+    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param typeRef
+     *            the type ref description to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+        return getMetadataCache(typeRef, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache that uses a particular serde object.
      *
      * @param <T>
      * @param serde
      *            the custom serialization/deserialization object
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde);
+    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache that uses a particular serde object.
+     *
+     * @param <T>
+     * @param serde
+     *            the custom serialization/deserialization object
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
+        return getMetadataCache(serde, getDefaultMetadataCacheConfig());
+    }
+
+    /**
+     * Returns the default metadata cache config.
+     *
+     * @return default metadata cache config
+     */
+    default MetadataCacheConfig getDefaultMetadataCacheConfig() {
+        return MetadataCacheConfig.builder().build();

Review Comment:
   we can make in "final" in order to prevent subclassing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r961588142


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java:
##########
@@ -136,27 +136,78 @@ public interface MetadataStore extends AutoCloseable {
      * @param <T>
      * @param clazz
      *            the class type to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+    <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param clazz
+     *            the class type to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+        return getMetadataCache(clazz, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache specialized for a specific class.
      *
      * @param <T>
      * @param typeRef
      *            the type ref description to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
+    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param typeRef
+     *            the type ref description to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+        return getMetadataCache(typeRef, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache that uses a particular serde object.
      *
      * @param <T>
      * @param serde
      *            the custom serialization/deserialization object
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde);
+    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache that uses a particular serde object.
+     *
+     * @param <T>
+     * @param serde
+     *            the custom serialization/deserialization object
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
+        return getMetadataCache(serde, getDefaultMetadataCacheConfig());
+    }
+
+    /**
+     * Returns the default metadata cache config.
+     *
+     * @return default metadata cache config
+     */
+    default MetadataCacheConfig getDefaultMetadataCacheConfig() {
+        return MetadataCacheConfig.builder().build();

Review Comment:
   I tried extracting a constant. That doesn't make sense since it would be available on the interface as a public field. It's not possible to have private fields on a public interface. 
   This's why It's better not to introduce a constant field on the interface at all. 
   This is not a bottleneck in code, so there's no need for a constant.
   The MetadataCache instances are created once at startup time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r966613780


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java:
##########
@@ -73,14 +76,20 @@ private enum InternalState {
         this.path = path;
         this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
         this.store = store;
-        this.cache = store.getMetadataCache(clazz);
+        MetadataCacheConfig metadataCacheConfig = MetadataCacheConfig.builder()
+                .expireAfterWriteMillis(-1L)
+                .build();
+        this.cache = store.getMetadataCache(clazz, metadataCacheConfig);
         this.leaderElectionState = LeaderElectionState.NoLeader;
         this.internalState = InternalState.Init;
         this.stateChangesListener = stateChangesListener;
         this.executor = executor;
 
         store.registerListener(this::handlePathNotification);
         store.registerSessionListener(this::handleSessionNotification);
+        updateCachedValueFuture = executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
+                metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
+                metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);

Review Comment:
   My main thought is that with this loop, it's technically not necessary to remove the expiration logic from the cache because we're artificially keeping the value in the cache here. It's going to run pretty infrequently, so I don't think it's necessary to change it, but it does seem like an indirect way to handle the potential for "missed zk notifications".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] srivatsav commented on pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes

Posted by GitBox <gi...@apache.org>.
srivatsav commented on PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#issuecomment-1306990837

   @lhotari  here are the stack traces of both the errors
   
   Pulsar Version - 2.9.2
   
   `WARN org.apache.pulsar.broker.namespace.NamespaceService - The information about the current leader broker wasn't available. Handling load manager decisions in a decentralized way.`
   
   ```
   
   2022-11-08T09:54:38,651+0000 [pulsar-web-38-4] WARN  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Failed to create namespace ALGRUS/JobSync - already exists
   2022-11-08T09:54:38,651+0000 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://TQL1US/JobSync/jobevents-ack-partition-0][flink-pulsar-dd2186ba-8355-4536-aaa2-056d7bae6dda] Successfully disconnected consumers from subscription, proceeding with cursor reset
   2022-11-08T09:54:38,651+0000 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [TQL1US/JobSync/persistent/jobevents-ack-partition-0] Opened new cursor: ManagedCursorImpl{ledger=TQL1US/JobSync/persistent/jobevents-ack-partition-0, name=flink-pulsar-dd2186ba-8355-4536-aaa2-056d7bae6dda, ackPos=3504667:-1, readPos=3504667:0}
   2022-11-08T09:54:38,651+0000 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [TQL1US/JobSync/persistent/jobevents-ack-partition-0] Updated cursor flink-pulsar-dd2186ba-8355-4536-aaa2-056d7bae6dda with ledger id 3509778 md-position=3504667:-1 rd-position=3504667:0
   2022-11-08T09:54:38,649+0000 [pulsar-2-1] WARN  org.apache.pulsar.broker.namespace.NamespaceService - The information about the current leader broker wasn't available. Handling load manager decisions in a decentralized way. NamespaceBundle[ASCAUS/JobSync/0x00000000_0x40000000]
   2022-11-08T09:54:38,649+0000 [pulsar-2-2] WARN  org.apache.pulsar.broker.namespace.NamespaceService - The information about the current leader broker wasn't available. Handling load manager decisions in a decentralized way. NamespaceBundle[AEROUS/JobSync/0xc0000000_0xffffffff]
   ```
   
   `WARN Failed to acquire ownership for namespace bundle`
   ```
   	... 7 more
   	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
   	at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$doRevalidate$20(ResourceLockImpl.java:297) ~[org.apache.pulsar-pulsar-metadata-2.9.2.jar:2.9.2]
   Caused by: org.apache.pulsar.metadata.api.MetadataStoreException$LockBusyException: Resource at /namespace/CACIGLOBAL/JobSync/0x00000000_0x40000000 is already locked
   	... 17 more
   	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
   	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
   Caused by: java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException$LockBusyException: Resource at /namespace/CACIGLOBAL/JobSync/0x00000000_0x40000000 is already locked
   	... 20 more
   Caused by: org.apache.pulsar.broker.PulsarServerException: Failed to acquire ownership for namespace bundle CACIGLOBAL/JobSync/0x00000000_0x40000000
   	at java.lang.Thread.run(Thread.java:829) [?:?]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
   	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$get$7(ZKMetadataStore.java:139) ~[org.apache.pulsar-pulsar-metadata-2.9.2.jar:2.9.2]
   	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
   	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
   	at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$acquire$4(ResourceLockImpl.java:134) ~[org.apache.pulsar-pulsar-metadata-2.9.2.jar:2.9.2]
   	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
   	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
   	at org.apache.pulsar.metadata.coordination.impl.LockManagerImpl.lambda$acquireLock$2(LockManagerImpl.java:111) ~[org.apache.pulsar-pulsar-metadata-2.9.2.jar:2.9.2]
   	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
   	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
   	at org.apache.pulsar.broker.namespace.NamespaceService.lambda$searchForCandidateBroker$15(NamespaceService.java:578) ~[org.apache.pulsar-pulsar-broker-2.9.2.jar:2.9.2]
   	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376) ~[?:?]
   	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367) ~[?:?]
   java.util.concurrent.CompletionException: org.apache.pulsar.broker.PulsarServerException: Failed to acquire ownership for namespace bundle CACIGLOBAL/JobSync/0x00000000_0x40000000
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#issuecomment-1234283097

   @lhotari Please provide a correct documentation label for your PR.
   Instructions see [Pulsar Documentation Label Guide](https://docs.google.com/document/d/1Qw7LHQdXWBW9t2-r-A7QdFDBwmZh6ytB4guwMoXHqc0).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17401: [fix][broker] Fix issue where leader broker information isn't available after 10 minutes of idling

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17401:
URL: https://github.com/apache/pulsar/pull/17401#discussion_r960746886


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java:
##########
@@ -136,27 +136,78 @@ public interface MetadataStore extends AutoCloseable {
      * @param <T>
      * @param clazz
      *            the class type to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+    <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param clazz
+     *            the class type to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+        return getMetadataCache(clazz, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache specialized for a specific class.
      *
      * @param <T>
      * @param typeRef
      *            the type ref description to be used for serialization/deserialization
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
+    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param typeRef
+     *            the type ref description to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+        return getMetadataCache(typeRef, getDefaultMetadataCacheConfig());
+    }
 
     /**
      * Create a metadata cache that uses a particular serde object.
      *
      * @param <T>
      * @param serde
      *            the custom serialization/deserialization object
+     * @param cacheConfig
+     *          the cache configuration to be used
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde);
+    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig);
+
+    /**
+     * Create a metadata cache that uses a particular serde object.
+     *
+     * @param <T>
+     * @param serde
+     *            the custom serialization/deserialization object
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
+        return getMetadataCache(serde, getDefaultMetadataCacheConfig());
+    }
+
+    /**
+     * Returns the default metadata cache config.
+     *
+     * @return default metadata cache config
+     */
+    default MetadataCacheConfig getDefaultMetadataCacheConfig() {
+        return MetadataCacheConfig.builder().build();

Review Comment:
   `MetadataCacheConfig` isn't an immutable object so that's why I don't think it's useful to use a constant. 
   I followed the same pattern for MetadataCacheConfig as is used for MetadataStoreConfig which is mutable.
   
   Sharing mutable objects for multiple purposes isn't recommended. This isn't a performance bottleneck either so I think it's better to leave it as it is.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org