You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/03 17:28:59 UTC

[pulsar] branch branch-2.9 updated (da03d89 -> c33b53a)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from da03d89  [Functions][k8s] Expose function container metrics port (#12065)
     new 06c344f  [Java Client] Make Audience Field Optional in OAuth2 Client Credentials (#11988)
     new a40eda0  [Functions] Fix classloader leaks (#12973)
     new b95c6f3  [Broker] Fix and improve topic ownership assignment (#13069)
     new c33b53a  Update lombok to 1.18.22 (#12466)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |   2 +-
 pulsar-broker/pom.xml                              |   8 ++
 .../pulsar/broker/namespace/NamespaceService.java  | 104 +++++++++-----
 .../common/naming/NamespaceBundleFactory.java      |  32 ++++-
 .../apache/pulsar/broker/MultiBrokerBaseTest.java  |   6 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  11 +-
 .../loadbalance/MultiBrokerLeaderElectionTest.java |  97 +++++++++++++
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   |   6 +-
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |   4 +-
 .../pulsar/client/impl/auth/oauth2/README.md       |   4 +-
 .../impl/auth/oauth2/protocol/TokenClient.java     |  25 ++--
 .../impl/auth/oauth2/AuthenticationOAuth2Test.java |  13 ++
 .../impl/auth/oauth2/protocol/TokenClientTest.java |  18 +--
 .../pulsar/common/util/ClassLoaderUtils.java       |  14 ++
 .../pulsar/functions/utils/FunctionCommon.java     | 151 ++++++++++++---------
 .../functions/worker/rest/api/SinksImpl.java       |  32 +++--
 .../functions/worker/rest/api/SourcesImpl.java     |  33 +++--
 pulsar-metadata/pom.xml                            |  11 ++
 site2/docs/security-oauth2.md                      |   4 +-
 site2/website-next/docs/security-oauth2.md         |   4 +-
 20 files changed, 409 insertions(+), 170 deletions(-)

[pulsar] 02/04: [Functions] Fix classloader leaks (#12973)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a40eda0262822a7c4c9b556b72822dedcd1179eb
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Dec 3 13:37:21 2021 +0200

    [Functions] Fix classloader leaks (#12973)
    
    * Fix classloader leak in FunctionCommon.getClassLoaderFromPackage
    
    * Fix classloader leak in SinksImpl and SourcesImpl
    
    * Fix logic for shouldCloseClassLoader
    
    (cherry picked from commit cab946b4ca68e1ffc6dee3932bc4c0fc7e7da66e)
---
 .../pulsar/common/util/ClassLoaderUtils.java       |  14 ++
 .../pulsar/functions/utils/FunctionCommon.java     | 151 ++++++++++++---------
 .../functions/worker/rest/api/SinksImpl.java       |  32 +++--
 .../functions/worker/rest/api/SourcesImpl.java     |  33 +++--
 4 files changed, 140 insertions(+), 90 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
index 0e1e188..69e4c63 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
@@ -18,16 +18,20 @@
  */
 package org.apache.pulsar.common.util;
 
+import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Helper methods wrt Classloading.
  */
+@Slf4j
 public class ClassLoaderUtils {
     /**
      * Load a jar.
@@ -76,4 +80,14 @@ public class ClassLoaderUtils {
                     String.format("%s does not implement %s", className, klass.getName()));
         }
     }
+
+    public static void closeClassLoader(ClassLoader classLoader) {
+        if (classLoader instanceof Closeable) {
+            try {
+                ((Closeable) classLoader).close();
+            } catch (IOException e) {
+                log.error("Error closing classloader {}", classLoader, e);
+            }
+        }
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index a13695e..f72814d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -382,97 +382,114 @@ public class FunctionCommon {
             String narExtractionDirectory) {
         String connectorClassName = className;
         ClassLoader jarClassLoader = null;
+        boolean keepJarClassLoader = false;
         ClassLoader narClassLoader = null;
+        boolean keepNarClassLoader = false;
 
         Exception jarClassLoaderException = null;
         Exception narClassLoaderException = null;
 
         try {
-            jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
-        } catch (Exception e) {
-            jarClassLoaderException = e;
-        }
-        try {
-            narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
-        } catch (Exception e) {
-            narClassLoaderException = e;
-        }
-
-        // if connector class name is not provided, we can only try to load archive as a NAR
-        if (isEmpty(connectorClassName)) {
-            if (narClassLoader == null) {
-                throw new IllegalArgumentException(String.format("%s package does not have the correct format. " +
-                                "Pulsar cannot determine if the package is a NAR package or JAR package. " +
-                                "%s classname is not provided and attempts to load it as a NAR package produced the following error.",
-                        capFirstLetter(componentType), capFirstLetter(componentType)),
-                        narClassLoaderException);
-            }
             try {
-                if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
-                    connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
-                } else {
-                    connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
-                        componentType.toString().toLowerCase()), e);
+                jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
+            } catch (Exception e) {
+                jarClassLoaderException = e;
             }
-
             try {
-                narClassLoader.loadClass(connectorClassName);
-                return narClassLoader;
-            } catch (ClassNotFoundException | NoClassDefFoundError e) {
-                throw new IllegalArgumentException(
-                        String.format("%s class %s must be in class path", capFirstLetter(componentType), connectorClassName), e);
+                narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
+            } catch (Exception e) {
+                narClassLoaderException = e;
             }
 
-        } else {
-            // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
-            if (jarClassLoader != null) {
+            // if connector class name is not provided, we can only try to load archive as a NAR
+            if (isEmpty(connectorClassName)) {
+                if (narClassLoader == null) {
+                    throw new IllegalArgumentException(String.format("%s package does not have the correct format. " +
+                                    "Pulsar cannot determine if the package is a NAR package or JAR package. " +
+                                    "%s classname is not provided and attempts to load it as a NAR package produced " +
+                                    "the following error.",
+                            capFirstLetter(componentType), capFirstLetter(componentType)),
+                            narClassLoaderException);
+                }
                 try {
-                    jarClassLoader.loadClass(connectorClassName);
-                    return jarClassLoader;
-                } catch (ClassNotFoundException | NoClassDefFoundError e) {
-                    // class not found in JAR try loading as a NAR and searching for the class
-                    if (narClassLoader != null) {
-
-                        try {
-                            narClassLoader.loadClass(connectorClassName);
-                            return narClassLoader;
-                        } catch (ClassNotFoundException | NoClassDefFoundError e1) {
-                            throw new IllegalArgumentException(
-                                    String.format("%s class %s must be in class path",
-                                            capFirstLetter(componentType), connectorClassName), e1);
-                        }
+                    if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
+                        connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
                     } else {
-                        throw new IllegalArgumentException(
-                                String.format("%s class %s must be in class path", capFirstLetter(componentType),
-                                        connectorClassName), e);
+                        connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
                     }
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
+                            componentType.toString().toLowerCase()), e);
                 }
-            } else if (narClassLoader != null) {
+
                 try {
                     narClassLoader.loadClass(connectorClassName);
+                    keepNarClassLoader = true;
                     return narClassLoader;
-                } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+                } catch (ClassNotFoundException | NoClassDefFoundError e) {
                     throw new IllegalArgumentException(
-                            String.format("%s class %s must be in class path",
-                                    capFirstLetter(componentType), connectorClassName), e1);
+                            String.format("%s class %s must be in class path", capFirstLetter(componentType),
+                                    connectorClassName), e);
                 }
+
             } else {
-                StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
-                        + " package does not have the correct format."
-                        + " Pulsar cannot determine if the package is a NAR package or JAR package.");
+                // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
+                if (jarClassLoader != null) {
+                    try {
+                        jarClassLoader.loadClass(connectorClassName);
+                        keepJarClassLoader = true;
+                        return jarClassLoader;
+                    } catch (ClassNotFoundException | NoClassDefFoundError e) {
+                        // class not found in JAR try loading as a NAR and searching for the class
+                        if (narClassLoader != null) {
+
+                            try {
+                                narClassLoader.loadClass(connectorClassName);
+                                keepNarClassLoader = true;
+                                return narClassLoader;
+                            } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+                                throw new IllegalArgumentException(
+                                        String.format("%s class %s must be in class path",
+                                                capFirstLetter(componentType), connectorClassName), e1);
+                            }
+                        } else {
+                            throw new IllegalArgumentException(
+                                    String.format("%s class %s must be in class path", capFirstLetter(componentType),
+                                            connectorClassName), e);
+                        }
+                    }
+                } else if (narClassLoader != null) {
+                    try {
+                        narClassLoader.loadClass(connectorClassName);
+                        keepNarClassLoader = true;
+                        return narClassLoader;
+                    } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+                        throw new IllegalArgumentException(
+                                String.format("%s class %s must be in class path",
+                                        capFirstLetter(componentType), connectorClassName), e1);
+                    }
+                } else {
+                    StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
+                            + " package does not have the correct format."
+                            + " Pulsar cannot determine if the package is a NAR package or JAR package.");
 
-                if (jarClassLoaderException != null) {
-                    errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
-                }
+                    if (jarClassLoaderException != null) {
+                        errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
+                    }
 
-                if (narClassLoaderException != null) {
-                    errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
-                }
+                    if (narClassLoaderException != null) {
+                        errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
+                    }
 
-                throw new IllegalArgumentException(errorMsg.toString());
+                    throw new IllegalArgumentException(errorMsg.toString());
+                }
+            }
+        } finally {
+            if (!keepJarClassLoader) {
+                ClassLoaderUtils.closeClassLoader(jarClassLoader);
+            }
+            if (!keepNarClassLoader) {
+                ClassLoaderUtils.closeClassLoader(narClassLoader);
             }
         }
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 31a7234..bbb1680 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -734,19 +735,28 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
             }
         }
 
-        // if sink is not builtin, attempt to extract classloader from package file if it exists
-        if (classLoader == null && sinkPackageFile != null) {
-            classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(),
-                    sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
-        }
+        boolean shouldCloseClassLoader = false;
+        try {
 
-        if (classLoader == null) {
-            throw new IllegalArgumentException("Sink package is not provided");
-        }
+            // if sink is not builtin, attempt to extract classloader from package file if it exists
+            if (classLoader == null && sinkPackageFile != null) {
+                classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(),
+                        sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+                shouldCloseClassLoader = true;
+            }
 
-        SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
-                sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
-        return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+            if (classLoader == null) {
+                throw new IllegalArgumentException("Sink package is not provided");
+            }
+
+            SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
+                    sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+            return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+        } finally {
+            if (shouldCloseClassLoader) {
+                ClassLoaderUtils.closeClassLoader(classLoader);
+            }
+        }
     }
 
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 1e9148b..82a818d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -730,20 +731,28 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
             }
         }
 
-        // if source is not builtin, attempt to extract classloader from package file if it exists
-        if (classLoader == null && sourcePackageFile != null) {
-            classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(),
-                    sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
-        }
+        boolean shouldCloseClassLoader = false;
+        try {
+            // if source is not builtin, attempt to extract classloader from package file if it exists
+            if (classLoader == null && sourcePackageFile != null) {
+                classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(),
+                        sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+                shouldCloseClassLoader = true;
+            }
 
-        if (classLoader == null) {
-            throw new IllegalArgumentException("Source package is not provided");
-        }
+            if (classLoader == null) {
+                throw new IllegalArgumentException("Source package is not provided");
+            }
 
-        SourceConfigUtils.ExtractedSourceDetails sourceDetails
-                = SourceConfigUtils.validateAndExtractDetails(
-                        sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
-        return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+            SourceConfigUtils.ExtractedSourceDetails sourceDetails
+                    = SourceConfigUtils.validateAndExtractDetails(
+                            sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+            return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+        } finally {
+            if (shouldCloseClassLoader) {
+                ClassLoaderUtils.closeClassLoader(classLoader);
+            }
+        }
     }
 
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {

[pulsar] 04/04: Update lombok to 1.18.22 (#12466)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c33b53ada29c7d18ed7d11456c34554d061a0a1f
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Fri Oct 22 09:12:56 2021 -0700

    Update lombok to 1.18.22 (#12466)
    
    Co-authored-by: Ali Ahmed <al...@splunk.com>
    (cherry picked from commit 9ee6656fbd662d1d6f36b8e3b71e01f07c93a2fa)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 480ef81..09c5c6e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,7 +183,7 @@ flexible messaging model and an intuitive client API.</description>
     <hppc.version>0.7.3</hppc.version>
     <spark-streaming_2.10.version>2.1.0</spark-streaming_2.10.version>
     <assertj-core.version>3.18.1</assertj-core.version>
-    <lombok.version>1.18.20</lombok.version>
+    <lombok.version>1.18.22</lombok.version>
     <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
     <jaxb-api>2.3.1</jaxb-api>
     <javax.activation.version>1.2.0</javax.activation.version>

[pulsar] 01/04: [Java Client] Make Audience Field Optional in OAuth2 Client Credentials (#11988)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 06c344f2a60ffa995be81669059be1cd6ef6851e
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Thu Dec 2 22:55:44 2021 -0600

    [Java Client] Make Audience Field Optional in OAuth2 Client Credentials (#11988)
    
    * [Java Client] Make Audience Field in Client Credentials Optional
    
    * Update site2/website-next/docs
    
    * Remove update to 2.8.1 docs
    
    * Update more docs based on code review
    
    (cherry picked from commit b2b5463a574806b611c4065a0a27c154e7cbe0bd)
---
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   |  6 +++---
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |  4 ++--
 .../pulsar/client/impl/auth/oauth2/README.md       |  4 ++--
 .../impl/auth/oauth2/protocol/TokenClient.java     | 25 ++++++++++++----------
 .../impl/auth/oauth2/AuthenticationOAuth2Test.java | 13 +++++++++++
 .../impl/auth/oauth2/protocol/TokenClientTest.java | 18 +++-------------
 site2/docs/security-oauth2.md                      |  4 ++--
 site2/website-next/docs/security-oauth2.md         |  4 ++--
 8 files changed, 41 insertions(+), 37 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index 707fcaf..cf56774 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -33,7 +33,7 @@ public final class AuthenticationFactoryOAuth2 {
      *
      * @param issuerUrl the issuer URL
      * @param credentialsUrl the credentials URL
-     * @param audience the audience identifier
+     * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
      * @return an Authentication object
      */
     public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
@@ -45,9 +45,9 @@ public final class AuthenticationFactoryOAuth2 {
      *
      * @param issuerUrl the issuer URL
      * @param credentialsUrl the credentials URL
-     * @param audience the audience identifier
+     * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
      * @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited,
-     *              case-sensitive strings.  The strings are defined by the authorization server.
+     *              case-sensitive strings. The strings are defined by the authorization server.
      *              If the value contains multiple space-delimited strings, their order does not matter,
      *              and each string adds an additional access range to the requested scope.
      *              From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index b011e85..bf0c289 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -118,10 +118,10 @@ class ClientCredentialsFlow extends FlowBase {
      */
     public static ClientCredentialsFlow fromParameters(Map<String, String> params) {
         URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
-        String audience = parseParameterString(params, CONFIG_PARAM_AUDIENCE);
         String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE);
-        // This is an optional parameter
+        // These are optional parameters, so we only perform a get
         String scope = params.get(CONFIG_PARAM_SCOPE);
+        String audience = params.get(CONFIG_PARAM_AUDIENCE);
         return ClientCredentialsFlow.builder()
                 .issuerUrl(issuerUrl)
                 .audience(audience)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/README.md b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/README.md
index 55ffe58..b8b1237 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/README.md
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/README.md
@@ -46,7 +46,7 @@ The following parameters are supported:
 | `type` | Oauth 2.0 auth type. Optional. | default: `client_credentials`  |
 | `issuerUrl` | URL of the provider which allows Pulsar to obtain an access token. Required. | `https://accounts.google.com` |
 | `privateKey` | URL to a JSON credentials file (in JSON format; see below). Required. | See "Supported Pattern Formats" |
-| `audience`  | An OAuth 2.0 "resource server" identifier for the Pulsar cluster. Required. | `https://broker.example.com` |
+| `audience`  | An OAuth 2.0 "resource server" identifier for the Pulsar cluster. Required by some Identity Providers. Optional for client. | `https://broker.example.com` |
 
 ### Supported Pattern Formats of `privateKey`
 The `privateKey` parameter supports the following three pattern formats, and contains client Credentials:
@@ -88,7 +88,7 @@ curl --request POST \
 In which,
 - `issuerUrl` parameter in this plugin is mapped to `--url https://dev-kt-aa9ne.us.auth0.com`
 - `privateKey` file parameter in this plugin should at least contains fields `client_id` and `client_secret`.
-- `audience` parameter in this plugin is mapped to  `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`
+- `audience` parameter in this plugin is mapped to  `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`. This field is only used by some identity providers.
 
 ## Pulsar Client Config
 You can use the provider with the following Pulsar clients.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index f8667e8..c2b9779 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -73,10 +73,21 @@ public class TokenClient implements ClientCredentialsExchanger {
 
     /**
      * Constructing http request parameters.
-     * @param bodyMap List of parameters to be requested.
+     * @param req object with relevant request parameters
      * @return Generate the final request body from a map.
      */
-    String buildClientCredentialsBody(Map<String, String> bodyMap) {
+    String buildClientCredentialsBody(ClientCredentialsExchangeRequest req) {
+        Map<String, String> bodyMap = new TreeMap<>();
+        bodyMap.put("grant_type", "client_credentials");
+        bodyMap.put("client_id", req.getClientId());
+        bodyMap.put("client_secret", req.getClientSecret());
+        // Only set audience and scope if they are non-empty.
+        if (!StringUtils.isBlank(req.getAudience())) {
+            bodyMap.put("audience", req.getAudience());
+        }
+        if (!StringUtils.isBlank(req.getScope())) {
+            bodyMap.put("scope", req.getScope());
+        }
         return bodyMap.entrySet().stream()
                 .map(e -> {
                     try {
@@ -96,15 +107,7 @@ public class TokenClient implements ClientCredentialsExchanger {
      */
     public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
             throws TokenExchangeException, IOException {
-        Map<String, String> bodyMap = new TreeMap<>();
-        bodyMap.put("grant_type", "client_credentials");
-        bodyMap.put("client_id", req.getClientId());
-        bodyMap.put("client_secret", req.getClientSecret());
-        bodyMap.put("audience", req.getAudience());
-        if (!StringUtils.isBlank(req.getScope())) {
-            bodyMap.put("scope", req.getScope());
-        }
-        String body = buildClientCredentialsBody(bodyMap);
+        String body = buildClientCredentialsBody(req);
 
         try {
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
index ac14dd2..3ae578c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -86,6 +86,19 @@ public class AuthenticationOAuth2Test {
         params.put("privateKey", "data:base64,e30=");
         params.put("issuerUrl", "http://localhost");
         params.put("audience", "http://localhost");
+        params.put("scope", "http://localhost");
+        ObjectMapper mapper = new ObjectMapper();
+        String authParams = mapper.writeValueAsString(params);
+        this.auth.configure(authParams);
+        assertNotNull(this.auth.flow);
+    }
+
+    @Test
+    public void testConfigureWithoutOptionalParams() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        params.put("privateKey", "data:base64,e30=");
+        params.put("issuerUrl", "http://localhost");
         ObjectMapper mapper = new ObjectMapper();
         String authParams = mapper.writeValueAsString(params);
         this.auth.configure(authParams);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
index 1617359..da70d6c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
@@ -47,19 +47,13 @@ public class TokenClientTest {
         DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class);
         URL url = new URL("http://localhost");
         TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
-        Map<String, String> bodyMap = new TreeMap<>();
         ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder()
                 .audience("test-audience")
                 .clientId("test-client-id")
                 .clientSecret("test-client-secret")
                 .scope("test-scope")
                 .build();
-        bodyMap.put("grant_type", "client_credentials");
-        bodyMap.put("client_id", request.getClientId());
-        bodyMap.put("client_secret", request.getClientSecret());
-        bodyMap.put("audience", request.getAudience());
-        bodyMap.put("scope", request.getScope());
-        String body = tokenClient.buildClientCredentialsBody(bodyMap);
+        String body = tokenClient.buildClientCredentialsBody(request);
         BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class);
         Response response = mock(Response.class);
         ListenableFuture<Response> listenableFuture = mock(ListenableFuture.class);
@@ -80,22 +74,16 @@ public class TokenClientTest {
 
     @Test
     @SuppressWarnings("unchecked")
-    public void exchangeClientCredentialsSuccessByNoScopeTest() throws
+    public void exchangeClientCredentialsSuccessWithoutOptionalClientCredentialsTest() throws
             IOException, TokenExchangeException, ExecutionException, InterruptedException {
         DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class);
         URL url = new URL("http://localhost");
         TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
-        Map<String, String> bodyMap = new TreeMap<>();
         ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder()
-                .audience("test-audience")
                 .clientId("test-client-id")
                 .clientSecret("test-client-secret")
                 .build();
-        bodyMap.put("grant_type", "client_credentials");
-        bodyMap.put("client_id", request.getClientId());
-        bodyMap.put("client_secret", request.getClientSecret());
-        bodyMap.put("audience", request.getAudience());
-        String body = tokenClient.buildClientCredentialsBody(bodyMap);
+        String body = tokenClient.buildClientCredentialsBody(request);
         BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class);
         Response response = mock(Response.class);
         ListenableFuture<Response> listenableFuture = mock(ListenableFuture.class);
diff --git a/site2/docs/security-oauth2.md b/site2/docs/security-oauth2.md
index 7ea9f35..35d0b5f 100644
--- a/site2/docs/security-oauth2.md
+++ b/site2/docs/security-oauth2.md
@@ -28,7 +28,7 @@ The following table lists parameters supported for the `client credentials` auth
 | `type` | Oauth 2.0 authentication type. |  `client_credentials` (default) | Optional |
 | `issuerUrl` | URL of the authentication provider which allows the Pulsar client to obtain an access token | `https://accounts.google.com` | Required |
 | `privateKey` | URL to a JSON credentials file  | Support the following pattern formats: <br> <li> `file:///path/to/file` <li>`file:/path/to/file` <li> `data:application/json;base64,<base64-encoded value>` | Required |
-| `audience`  | An OAuth 2.0 "resource server" identifier for the Pulsar cluster | `https://broker.example.com` | Required |
+| `audience`  | An OAuth 2.0 "resource server" identifier for the Pulsar cluster | `https://broker.example.com` | Optional |
 
 The credentials file contains service account credentials used with the client authentication type. The following shows an example of a credentials file `credentials_file.json`.
 
@@ -63,7 +63,7 @@ In the above example, the mapping relationship is shown as below.
 
 - The `issuerUrl` parameter in this plugin is mapped to `--url https://dev-kt-aa9ne.us.auth0.com`.
 - The `privateKey` file parameter in this plugin should at least contains the `client_id` and `client_secret` fields.
-- The `audience` parameter in this plugin is mapped to  `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`.
+- The `audience` parameter in this plugin is mapped to  `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`. This field is only used by some identity providers.
 
 ## Client Configuration
 
diff --git a/site2/website-next/docs/security-oauth2.md b/site2/website-next/docs/security-oauth2.md
index 19c3789..820a696 100644
--- a/site2/website-next/docs/security-oauth2.md
+++ b/site2/website-next/docs/security-oauth2.md
@@ -32,7 +32,7 @@ The following table lists parameters supported for the `client credentials` auth
 | `type` | Oauth 2.0 authentication type. |  `client_credentials` (default) | Optional |
 | `issuerUrl` | URL of the authentication provider which allows the Pulsar client to obtain an access token | `https://accounts.google.com` | Required |
 | `privateKey` | URL to a JSON credentials file  | Support the following pattern formats: <br /> <li> `file:///path/to/file` </li><li>`file:/path/to/file` </li><li> `data:application/json;base64,<base64-encoded value>` </li>| Required |
-| `audience`  | An OAuth 2.0 "resource server" identifier for the Pulsar cluster | `https://broker.example.com` | Required |
+| `audience`  | An OAuth 2.0 "resource server" identifier for the Pulsar cluster | `https://broker.example.com` | Optional |
 
 The credentials file contains service account credentials used with the client authentication type. The following shows an example of a credentials file `credentials_file.json`.
 
@@ -71,7 +71,7 @@ In the above example, the mapping relationship is shown as below.
 
 - The `issuerUrl` parameter in this plugin is mapped to `--url https://dev-kt-aa9ne.us.auth0.com`.
 - The `privateKey` file parameter in this plugin should at least contains the `client_id` and `client_secret` fields.
-- The `audience` parameter in this plugin is mapped to  `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`.
+- The `audience` parameter in this plugin is mapped to  `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`. This field is only used by some identity providers.
 
 ## Client Configuration
 

[pulsar] 03/04: [Broker] Fix and improve topic ownership assignment (#13069)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b95c6f3113bd1b927a98de2c0641323431527ab8
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Dec 3 13:38:55 2021 +0200

    [Broker] Fix and improve topic ownership assignment (#13069)
    
    * Add warning log message when leader broker isn't available
    
    * Add more logging about load manager decisions
    
    * Use cached information for available brokers
    
    * Reproduce lookup race issue
    
    * Use java.util.concurrent.Phaser to increase the chances of a race
    
    * Address review feedback
    
    * Increase concurrency of test case to reproduce race conditions
    
    * Use real Zookeeper server in MultiBrokerLeaderElectionTest
    
    * Add retry with backoff to loading namespace bundles
    
    * Add more topics to test
    
    * Address review comment
    
    * Fix checkstyle
    
    * Improve logging
    
    * Address review comments
    
    (cherry picked from commit 537dee113ccb1e405a8fd1d5698b8f066f52bb53)
---
 pulsar-broker/pom.xml                              |   8 ++
 .../pulsar/broker/namespace/NamespaceService.java  | 104 ++++++++++++++-------
 .../common/naming/NamespaceBundleFactory.java      |  32 ++++++-
 .../apache/pulsar/broker/MultiBrokerBaseTest.java  |   6 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  11 ++-
 .../loadbalance/MultiBrokerLeaderElectionTest.java |  97 +++++++++++++++++++
 pulsar-metadata/pom.xml                            |  11 +++
 7 files changed, 227 insertions(+), 42 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index aef0f2e..bb7bee7 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -568,6 +568,14 @@
           <type>test-jar</type>
           <scope>test</scope>
         </dependency>
+
+        <dependency>
+          <groupId>${project.groupId}</groupId>
+          <artifactId>pulsar-metadata</artifactId>
+          <version>${project.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
     <profile>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index adcb504..dc8f383 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -454,6 +454,9 @@ public class NamespaceService implements AutoCloseable {
             // The leader election service was not initialized yet. This can happen because the broker service is
             // initialized first and it might start receiving lookup requests before the leader election service is
             // fully initialized.
+            LOG.warn("Leader election service isn't initialized yet. "
+                            + "Returning empty result to lookup. NamespaceBundle[{}]",
+                    bundle);
             lookupFuture.complete(Optional.empty());
             return;
         }
@@ -480,23 +483,45 @@ public class NamespaceService implements AutoCloseable {
                 if (options.isAuthoritative()) {
                     // leader broker already assigned the current broker as owner
                     candidateBroker = pulsar.getSafeWebServiceAddress();
-                } else if (!this.loadManager.get().isCentralized()
-                        || pulsar.getLeaderElectionService().isLeader()
-                        || !currentLeader.isPresent()
-
+                } else {
+                    LoadManager loadManager = this.loadManager.get();
+                    boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
+                    if (!makeLoadManagerDecisionOnThisBroker) {
                         // If leader is not active, fallback to pick the least loaded from current broker loadmanager
-                        || !isBrokerActive(currentLeader.get().getServiceUrl())
-                ) {
-                    Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
-                    if (!availableBroker.isPresent()) {
-                        lookupFuture.complete(Optional.empty());
-                        return;
+                        boolean leaderBrokerActive = currentLeader.isPresent()
+                                && isBrokerActive(currentLeader.get().getServiceUrl());
+                        if (!leaderBrokerActive) {
+                            makeLoadManagerDecisionOnThisBroker = true;
+                            if (!currentLeader.isPresent()) {
+                                LOG.warn(
+                                        "The information about the current leader broker wasn't available. "
+                                                + "Handling load manager decisions in a decentralized way. "
+                                                + "NamespaceBundle[{}]",
+                                        bundle);
+                            } else {
+                                LOG.warn(
+                                        "The current leader broker {} isn't active. "
+                                                + "Handling load manager decisions in a decentralized way. "
+                                                + "NamespaceBundle[{}]",
+                                        currentLeader.get(), bundle);
+                            }
+                        }
+                    }
+                    if (makeLoadManagerDecisionOnThisBroker) {
+                        Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
+                        if (!availableBroker.isPresent()) {
+                            LOG.warn("Load manager didn't return any available broker. "
+                                            + "Returning empty result to lookup. NamespaceBundle[{}]",
+                                    bundle);
+                            lookupFuture.complete(Optional.empty());
+                            return;
+                        }
+                        candidateBroker = availableBroker.get();
+                        authoritativeRedirect = true;
+                    } else {
+                        // forward to leader broker to make assignment
+                        candidateBroker = currentLeader.get().getServiceUrl();
                     }
-                    candidateBroker = availableBroker.get();
-                    authoritativeRedirect = true;
-                } else {
-                    // forward to leader broker to make assignment
-                    candidateBroker = currentLeader.get().getServiceUrl();
                 }
             }
         } catch (Exception e) {
@@ -577,19 +602,16 @@ public class NamespaceService implements AutoCloseable {
     }
 
     protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
-                                                                 final String advertisedListenerName)
-            throws Exception {
+                                                                 final String advertisedListenerName) {
 
         CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
         try {
-            checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
-            URI uri = new URI(candidateBroker);
-            String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
-                    uri.getPort());
+            checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
+            String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);
 
             localBrokerDataCache.get(path).thenAccept(reportData -> {
                 if (reportData.isPresent()) {
-                    LocalBrokerData lookupData = (LocalBrokerData) reportData.get();
+                    LocalBrokerData lookupData = reportData.get();
                     if (StringUtils.isNotBlank(advertisedListenerName)) {
                         AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
                         if (listener == null) {
@@ -622,22 +644,36 @@ public class NamespaceService implements AutoCloseable {
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
-                }
-                return true;
+        String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
+        Set<String> availableBrokers = getAvailableBrokers();
+        if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
             }
+            return true;
+        } else {
+            LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
+                    candidateBroker, candidateBrokerHostAndPort,
+                    availableBrokers.stream().collect(Collectors.joining(",")));
+            return false;
         }
+    }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Broker not found for SLA Monitoring Namespace {}",
-                    candidateBroker + ":" + config.getWebServicePort());
+    private static String parseHostAndPort(String candidateBroker) {
+        int uriSeparatorPos = candidateBroker.indexOf("://");
+        if (uriSeparatorPos == -1) {
+            throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
+        }
+        String candidateBrokerHostAndPort = candidateBroker.substring(uriSeparatorPos + 3);
+        return candidateBrokerHostAndPort;
+    }
+
+    private Set<String> getAvailableBrokers() {
+        try {
+            return loadManager.get().getAvailableBrokers();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
-        return false;
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 9fbcf17..586e3b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -31,6 +31,7 @@ import com.google.common.collect.BoundType;
 import com.google.common.collect.Range;
 import com.google.common.hash.HashFunction;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -46,6 +47,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -64,6 +66,7 @@ public class NamespaceBundleFactory {
 
     private final PulsarService pulsar;
     private final MetadataCache<Policies> policiesCache;
+    private final Duration maxRetryDuration = Duration.ofSeconds(10);
 
     public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
         this.hashFunc = hashFunc;
@@ -90,22 +93,27 @@ public class NamespaceBundleFactory {
         }
 
         CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
+        doLoadBundles(namespace, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
+        return future;
+    }
+
+    private void doLoadBundles(NamespaceName namespace, CompletableFuture<NamespaceBundles> future,
+                               Backoff backoff, long retryDeadline) {
         // Read the static bundle data from the policies
         pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {
-
             if (result.isPresent()) {
                 try {
                     future.complete(readBundles(namespace,
                             result.get().getValue(), result.get().getStat().getVersion()));
                 } catch (IOException e) {
-                    future.completeExceptionally(e);
+                    handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
                 }
             } else {
                 // If no local policies defined for namespace, copy from global config
                 copyToLocalPolicies(namespace)
                         .thenAccept(b -> future.complete(b))
                         .exceptionally(ex -> {
-                            future.completeExceptionally(ex);
+                            handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, ex);
                             return null;
                         });
             }
@@ -113,7 +121,23 @@ public class NamespaceBundleFactory {
             future.completeExceptionally(ex);
             return null;
         });
-        return future;
+    }
+
+    private void handleLoadBundlesRetry(NamespaceName namespace,
+                                        CompletableFuture<NamespaceBundles> future,
+                                        Backoff backoff, long retryDeadline, Throwable e) {
+        if (e instanceof Error || System.nanoTime() > retryDeadline) {
+            future.completeExceptionally(e);
+        } else {
+            LOG.warn("Error loading bundle for {}. Retrying exception", namespace, e);
+            long retryDelay = backoff.next();
+            pulsar.getExecutor().schedule(() ->
+                    doLoadBundles(namespace, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private static Backoff createBackoff() {
+        return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
     }
 
     private NamespaceBundles readBundles(NamespaceName namespace, LocalPolicies localPolicies, long version)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index f4d106d..c00ae8c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.zookeeper.MockZooKeeperSession;
 import org.testng.annotations.AfterClass;
@@ -82,13 +84,13 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
     }
 
     @Override
-    protected ZKMetadataStore createLocalMetadataStore() {
+    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
         // use MockZooKeeperSession to provide a unique session id for each instance
         return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
     }
 
     @Override
-    protected ZKMetadataStore createConfigurationMetadataStore() {
+    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
         // use MockZooKeeperSession to provide a unique session id for each instance
         return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index a3f7166..c50b477 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -60,6 +60,8 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -245,6 +247,11 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
             }
             bkExecutor = null;
         }
+        onCleanup();
+    }
+
+    protected void onCleanup() {
+
     }
 
     protected abstract void setup() throws Exception;
@@ -332,11 +339,11 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         }
     }
 
-    protected ZKMetadataStore createLocalMetadataStore() {
+    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
         return new ZKMetadataStore(mockZooKeeper);
     }
 
-    protected ZKMetadataStore createConfigurationMetadataStore() {
+    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
         return new ZKMetadataStore(mockZooKeeperGlobal);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
index 0045ddd..462b640 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -20,15 +20,68 @@ package org.apache.pulsar.broker.loadbalance;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.MultiBrokerBaseTest;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.metadata.TestZKServer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "broker")
 public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
+    @Override
+    protected int numberOfAdditionalBrokers() {
+        return 9;
+    }
+
+    TestZKServer testZKServer;
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        testZKServer = new TestZKServer();
+    }
+
+    @Override
+    protected void onCleanup() {
+        super.onCleanup();
+        if (testZKServer != null) {
+            try {
+                testZKServer.close();
+            } catch (Exception e) {
+                log.error("Error in stopping ZK server", e);
+            }
+        }
+    }
+
+    @Override
+    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+    }
+
+    @Override
+    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+    }
 
     @Test
     public void shouldElectOneLeader() {
@@ -68,4 +121,48 @@ public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
             }
         });
     }
+
+    @Test
+    public void shouldProvideConsistentAnswerToTopicLookups()
+            throws PulsarAdminException, ExecutionException, InterruptedException {
+        String topicNameBase = "persistent://public/default/lookuptest" + UUID.randomUUID() + "-";
+        List<String> topicNames = IntStream.range(0, 500).mapToObj(i -> topicNameBase + i)
+                .collect(Collectors.toList());
+        List<PulsarAdmin> allAdmins = getAllAdmins();
+        @Cleanup("shutdown")
+        ExecutorService executorService = Executors.newFixedThreadPool(allAdmins.size());
+        List<Future<List<String>>> resultFutures = new ArrayList<>();
+        String leaderBrokerUrl = admin.brokers().getLeaderBroker().getServiceUrl();
+        log.info("LEADER is {}", leaderBrokerUrl);
+        // use Phaser to increase the chances of a race condition by triggering all threads once
+        // they are waiting just before the lookupTopic call
+        final Phaser phaser = new Phaser(1);
+        for (PulsarAdmin brokerAdmin : allAdmins) {
+            if (!leaderBrokerUrl.equals(brokerAdmin.getServiceUrl())) {
+                phaser.register();
+                log.info("Doing lookup to broker {}", brokerAdmin.getServiceUrl());
+                resultFutures.add(executorService.submit(() -> {
+                    phaser.arriveAndAwaitAdvance();
+                    return topicNames.stream().map(topicName -> {
+                        try {
+                            return brokerAdmin.lookups().lookupTopic(topicName);
+                        } catch (PulsarAdminException e) {
+                            log.error("Error looking up topic {} in {}", topicName, brokerAdmin.getServiceUrl());
+                            throw new RuntimeException(e);
+                        }
+                    }).collect(Collectors.toList());
+                }));
+            }
+        }
+        phaser.arriveAndAwaitAdvance();
+        List<String> firstResult = null;
+        for (Future<List<String>> resultFuture : resultFutures) {
+            List<String> result = resultFuture.get();
+            if (firstResult == null) {
+                firstResult = result;
+            } else {
+                assertEquals(result, firstResult, "The lookup results weren't consistent.");
+            }
+        }
+    }
 }
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 7ce5a6b..c98d951 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -78,6 +78,17 @@
   <build>
     <plugins>
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <groupId>com.github.spotbugs</groupId>
         <artifactId>spotbugs-maven-plugin</artifactId>
         <version>${spotbugs-maven-plugin.version}</version>