You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/04/13 18:22:24 UTC

[GitHub] merlimat closed pull request #1562: Converted to v2 topic names test related to ProducerConsumerBase

merlimat closed pull request #1562: Converted to v2 topic names test related to ProducerConsumerBase
URL: https://github.com/apache/incubator-pulsar/pull/1562
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index bf0b2551d..d47bec960 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -128,7 +128,7 @@ public void deleteNamespace(@PathParam("property") String property, @PathParam("
     }
 
     @DELETE
-    @Path("/{property}/{namespace}/bundle/{bundle}")
+    @Path("/{property}/{namespace}/{bundle}")
     @ApiOperation(value = "Delete a namespace bundle and all the topics under it.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
index 803d6c3e7..d8ae7574c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
@@ -18,10 +18,6 @@
  */
 package org.apache.pulsar.broker.lookup;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.net.URI;
-
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.common.lookup.data.LookupData;
 
@@ -69,5 +65,5 @@ public LookupData getLookupData() {
     public String toString() {
 		return "LookupResult [type=" + type + ", lookupData=" + lookupData + "]";
     }
-    
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
similarity index 72%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookup.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index f08530a74..39ad90e5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -44,6 +44,7 @@
 
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -62,39 +63,22 @@
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 
-@Path("/v2/destination/")
-@NoSwaggerDocumentation
-public class TopicLookup extends PulsarWebResource {
-
-    @GET
-    @Path("{topic-domain}/{property}/{cluster}/{namespace}/{topic}")
-    @Produces(MediaType.APPLICATION_JSON)
-    public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain,
-            @PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @Suspended AsyncResponse asyncResponse) {
-        String topicName = Codec.decode(encodedTopic);
-        TopicDomain domain = null;
-        try {
-            domain = TopicDomain.getEnum(topicDomain);
-        } catch (IllegalArgumentException e) {
-            log.error("[{}] Invalid topic-domain {}", clientAppId(), topicDomain, e);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Unsupported topic domain " + topicDomain);
-        }
-        TopicName topic = TopicName.get(domain.value(), property, cluster, namespace, topicName);
+public class TopicLookupBase extends PulsarWebResource {
+
+    private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
+    private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";
 
+    protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative, AsyncResponse asyncResponse) {
         if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
-            log.warn("No broker was found available for topic {}", topic);
+            log.warn("No broker was found available for topic {}", topicName);
             asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
             return;
         }
 
         try {
-            validateClusterOwnership(topic.getCluster());
-            checkConnect(topic);
-            validateGlobalNamespaceOwnership(topic.getNamespaceObject());
+            validateClusterOwnership(topicName.getCluster());
+            checkConnect(topicName);
+            validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
         } catch (WebApplicationException we) {
             // Validation checks failed
             log.error("Validation check failed: {}", we.getMessage());
@@ -107,13 +91,14 @@ public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain,
             return;
         }
 
-        CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService().getBrokerServiceUrlAsync(topic,
-                authoritative);
+        CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
+                .getBrokerServiceUrlAsync(topicName, authoritative);
 
         lookupFuture.thenAccept(optionalResult -> {
             if (optionalResult == null || !optionalResult.isPresent()) {
-                log.warn("No broker was found available for topic {}", topic);
-                completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
+                log.warn("No broker was found available for topic {}", topicName);
+                completeLookupResponseExceptionally(asyncResponse,
+                        new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
                 return;
             }
 
@@ -126,57 +111,41 @@ public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain,
                     String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
                             : result.getLookupData().getHttpUrl();
                     checkNotNull(redirectUrl, "Redirected cluster's service url is not configured");
-                    redirect = new URI(String.format("%s%s%s?authoritative=%s", redirectUrl, "/lookup/v2/destination/",
-                            topic.getLookupName(), newAuthoritative));
+                    String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
+                    redirect = new URI(String.format("%s%s%s?authoritative=%s", redirectUrl, lookupPath,
+                            topicName.getLookupName(), newAuthoritative));
                 } catch (URISyntaxException | NullPointerException e) {
-                    log.error("Error in preparing redirect url for {}: {}", topic, e.getMessage(), e);
+                    log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
                     completeLookupResponseExceptionally(asyncResponse, e);
                     return;
                 }
                 if (log.isDebugEnabled()) {
-                    log.debug("Redirect lookup for topic {} to {}", topic, redirect);
+                    log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
                 }
-                completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.temporaryRedirect(redirect).build()));
+                completeLookupResponseExceptionally(asyncResponse,
+                        new WebApplicationException(Response.temporaryRedirect(redirect).build()));
 
             } else {
                 // Found broker owning the topic
                 if (log.isDebugEnabled()) {
-                    log.debug("Lookup succeeded for topic {} -- broker: {}", topic, result.getLookupData());
+                    log.debug("Lookup succeeded for topic {} -- broker: {}", topicName, result.getLookupData());
                 }
                 completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
             }
         }).exceptionally(exception -> {
-            log.warn("Failed to lookup broker for topic {}: {}", topic, exception.getMessage(), exception);
+            log.warn("Failed to lookup broker for topic {}: {}", topicName, exception.getMessage(), exception);
             completeLookupResponseExceptionally(asyncResponse, exception);
             return null;
         });
-
     }
 
-    @GET
-    @Path("{topic-domain}/{property}/{cluster}/{namespace}/{topic}/bundle")
-    @Produces(MediaType.APPLICATION_JSON)
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 405, message = "Invalid topic domain type") })
-    public String getNamespaceBundle(@PathParam("topic-domain") String topicDomain,
-            @PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topicName) {
-        topicName = Codec.decode(topicName);
-        TopicDomain domain = null;
-        try {
-            domain = TopicDomain.getEnum(topicDomain);
-        } catch (IllegalArgumentException e) {
-            log.error("[{}] Invalid topic-domain {}", clientAppId(), topicDomain, e);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Bundle lookup can not be done on topic domain " + topicDomain);
-        }
-        TopicName topic = TopicName.get(domain.value(), property, cluster, namespace, topicName);
+    protected String internalGetNamespaceBundle(TopicName topicName) {
         validateSuperUserAccess();
         try {
-            NamespaceBundle bundle = pulsar().getNamespaceService().getBundle(topic);
+            NamespaceBundle bundle = pulsar().getNamespaceService().getBundle(topicName);
             return bundle.getBundleRange();
         } catch (Exception e) {
-            log.error("[{}] Failed to get namespace bundle for {}", clientAppId(), topic, e);
+            log.error("[{}] Failed to get namespace bundle for {}", clientAppId(), topicName, e);
             throw new RestException(e);
         }
     }
@@ -185,13 +154,11 @@ public String getNamespaceBundle(@PathParam("topic-domain") String topicDomain,
      *
      * Lookup broker-service address for a given namespace-bundle which contains given topic.
      *
-     * a. Returns broker-address if namespace-bundle is already owned by any broker
-     * b. If current-broker receives lookup-request and if it's not a leader
-     * then current broker redirects request to leader by returning leader-service address.
-     * c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and
-     * redirects request by returning least-loaded broker.
-     * d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns
-     * success(connect) response to client.
+     * a. Returns broker-address if namespace-bundle is already owned by any broker b. If current-broker receives
+     * lookup-request and if it's not a leader then current broker redirects request to leader by returning
+     * leader-service address. c. If current-broker is leader then it finds out least-loaded broker to own namespace
+     * bundle and redirects request by returning least-loaded broker. d. If current-broker receives request to own the
+     * namespace-bundle then it owns a bundle and returns success(connect) response to client.
      *
      * @param pulsarService
      * @param topicName
@@ -200,8 +167,8 @@ public String getNamespaceBundle(@PathParam("topic-domain") String topicDomain,
      * @param requestId
      * @return
      */
-    public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative,
-            String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
+    public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
+            boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
 
         final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
         final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
@@ -224,8 +191,8 @@ public String getNamespaceBundle(@PathParam("topic-domain") String topicDomain,
                     checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
                 } catch (RestException authException) {
                     log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.complete(
-                            newLookupErrorResponse(ServerError.AuthorizationError, authException.getMessage(), requestId));
+                    validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+                            authException.getMessage(), requestId));
                     return;
                 } catch (Exception e) {
                     log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
@@ -244,8 +211,8 @@ public String getNamespaceBundle(@PathParam("topic-domain") String topicDomain,
                             // request should be redirect to the peer-cluster
                             if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
                                     && StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())) {
-                                validationFuture.complete(
-                                        newLookupErrorResponse(ServerError.MetadataError, "Redirected cluster's brokerService url is not configured", requestId));
+                                validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
+                                        "Redirected cluster's brokerService url is not configured", requestId));
                                 return;
                             }
                             validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
@@ -253,8 +220,8 @@ public String getNamespaceBundle(@PathParam("topic-domain") String topicDomain,
                                     false));
 
                         }).exceptionally(ex -> {
-                            validationFuture
-                                    .complete(newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
+                            validationFuture.complete(
+                                    newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
                             return null;
                         });
             }
@@ -293,17 +260,17 @@ public String getNamespaceBundle(@PathParam("topic-domain") String topicDomain,
                                 boolean redirectThroughServiceUrl = pulsarService.getConfiguration()
                                         .isRunningStandalone();
 
-                                lookupfuture.complete(
-                                        newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(),
-                                                true /* authoritative */, LookupType.Connect, requestId, redirectThroughServiceUrl));
+                                lookupfuture.complete(newLookupResponse(lookupData.getBrokerUrl(),
+                                        lookupData.getBrokerUrlTls(), true /* authoritative */, LookupType.Connect,
+                                        requestId, redirectThroughServiceUrl));
                             }
                         }).exceptionally(ex -> {
                             if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
-                                log.info("Failed to lookup {} for topic {} with error {}", clientAppId, topicName.toString(),
-                                        ex.getCause().getMessage());
+                                log.info("Failed to lookup {} for topic {} with error {}", clientAppId,
+                                        topicName.toString(), ex.getCause().getMessage());
                             } else {
-                                log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, topicName.toString(),
-                                        ex.getMessage(), ex);
+                                log.warn("Failed to lookup {} for topic {} with error {}", clientAppId,
+                                        topicName.toString(), ex.getMessage(), ex);
                             }
                             lookupfuture.complete(
                                     newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
@@ -337,5 +304,17 @@ private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, Loo
         asyncResponse.resume(lookupData);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(TopicLookup.class);
+    protected TopicName getTopicName(String topicDomain, String property, String cluster, String namespace,
+            @Encoded String encodedTopic) {
+        String decodedName = Codec.decode(encodedTopic);
+        return TopicName.get(TopicDomain.getEnum(topicDomain).value(), property, cluster, namespace, decodedName);
+    }
+
+    protected TopicName getTopicName(String topicDomain, String property, String namespace,
+            @Encoded String encodedTopic) {
+        String decodedName = Codec.decode(encodedTopic);
+        return TopicName.get(TopicDomain.getEnum(topicDomain).value(), property, namespace, decodedName);
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(TopicLookupBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
new file mode 100644
index 000000000..167311a8d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.lookup.v1;
+
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.pulsar.broker.lookup.TopicLookupBase;
+import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * The path for this handler is marked as "v2" even though it refers to Pulsar 1.x topic name format.
+ *
+ * The lookup API was already <code>/v2/</code> in Pulsar 1.x. This was internally versioned at Yahoo to not clash with
+ * an earlier API.
+ *
+ * Since we're adding now the "Pulsar v2" we cannot rename this topic lookup into <code>/v1</code>. Rather the
+ * difference here would be : <code>lookup/v2/destination/persistent/prop/cluster/ns/topic</code> vs
+ * <code>lookup/v2/topic/persistent/prop/ns/topic</code>.
+ */
+@Path("/v2/destination/")
+@NoSwaggerDocumentation
+public class TopicLookup extends TopicLookupBase {
+
+    @GET
+    @Path("{topic-domain}/{property}/{cluster}/{namespace}/{topic}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @Suspended AsyncResponse asyncResponse) {
+        TopicName topicName = getTopicName(topicDomain, property, cluster, namespace, encodedTopic);
+        internalLookupTopicAsync(topicName, authoritative, asyncResponse);
+    }
+
+    @GET
+    @Path("{topic-domain}/{property}/{cluster}/{namespace}/{topic}/bundle")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 405, message = "Invalid topic domain type") })
+    public String getNamespaceBundle(@PathParam("topic-domain") String topicDomain,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic) {
+        TopicName topicName = getTopicName(topicDomain, property, cluster, namespace, encodedTopic);
+        return internalGetNamespaceBundle(topicName);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java
new file mode 100644
index 000000000..c5a3c18a1
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.lookup.v2;
+
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.pulsar.broker.lookup.TopicLookupBase;
+import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Path("/v2/topic")
+@NoSwaggerDocumentation
+public class TopicLookup extends TopicLookupBase {
+
+    @GET
+    @Path("{topic-domain}/{property}/{namespace}/{topic}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @PathParam("property") String property,
+            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @Suspended AsyncResponse asyncResponse) {
+        TopicName topicName = getTopicName(topicDomain, property, namespace, encodedTopic);
+        internalLookupTopicAsync(topicName, authoritative, asyncResponse);
+    }
+
+    @GET
+    @Path("{topic-domain}/{property}/{namespace}/{topic}/bundle")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 405, message = "Invalid topic domain type") })
+    public String getNamespaceBundle(@PathParam("topic-domain") String topicDomain,
+            @PathParam("property") String property, @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        TopicName topicName = getTopicName(topicDomain, property, namespace, encodedTopic);
+        return internalGetNamespaceBundle(topicName);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 239756eff..d3488e88b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -21,7 +21,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.getPartitionedTopicMetadata;
-import static org.apache.pulsar.broker.lookup.TopicLookup.lookupTopicAsync;
+import static org.apache.pulsar.broker.lookup.v1.TopicLookup.lookupTopicAsync;
 import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse;
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 62670aaf3..af6ed3a11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -294,7 +294,7 @@ public static String getNamespaceFromPoliciesPath(String path) {
         i.next();
         i.next();
         // prop, cluster, namespace
-        return Joiner.on("/").join(i.next(), i.next(), i.next());
+        return Joiner.on("/").join(i);
     }
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index c53db5b3c..192f96531 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -1747,7 +1747,7 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception {
 
     /**
      * This test-case verifies that broker should support both url/uri encoding for topic-name. It calls below api with
-     * url-encoded and also uri-encoded topic-name in http request: a. PartitionedMetadataLookup b. TopicLookup c. Topic
+     * url-encoded and also uri-encoded topic-name in http request: a. PartitionedMetadataLookup b. TopicLookupBase c. Topic
      * Stats
      *
      * @param topicName
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index b6a3af3da..6ddfc796c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -43,9 +43,9 @@
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.broker.lookup.TopicLookup;
 import org.apache.pulsar.broker.lookup.NamespaceData;
 import org.apache.pulsar.broker.lookup.RedirectData;
+import org.apache.pulsar.broker.lookup.v1.TopicLookup;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
index 7bb6a8fa5..44a57b72e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
@@ -43,7 +43,7 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testCreateNamespaceWithDefaultBundles() throws Exception {
-        String namespaceName = "prop/use/default-bundles";
+        String namespaceName = "prop/default-bundles";
 
         admin.namespaces().createNamespace(namespaceName);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
index 1da95640c..855d53a4b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
@@ -20,6 +20,8 @@
 
 import static org.testng.Assert.assertTrue;
 
+import com.google.common.collect.Sets;
+
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.api.Producer;
 import org.testng.annotations.AfterMethod;
@@ -42,23 +44,25 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testUnloadNotLoadedNamespace() throws Exception {
-        admin.namespaces().createNamespace("prop/use/ns-test-1");
+        admin.namespaces().createNamespace("prop/ns-test-1");
+        admin.namespaces().setNamespaceReplicationClusters("prop/ns-test-1", Sets.newHashSet("test"));
 
-        assertTrue(admin.namespaces().getNamespaces("prop", "use").contains("prop/use/ns-test-1"));
+        assertTrue(admin.namespaces().getNamespaces("prop").contains("prop/ns-test-1"));
 
-        admin.namespaces().unload("prop/use/ns-test-1");
+        admin.namespaces().unload("prop/ns-test-1");
     }
 
     @Test
     public void testUnloadPartiallyLoadedNamespace() throws Exception {
-        admin.namespaces().createNamespace("prop/use/ns-test-2", 16);
+        admin.namespaces().createNamespace("prop/ns-test-2", 16);
+        admin.namespaces().setNamespaceReplicationClusters("prop/ns-test-2", Sets.newHashSet("test"));
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop/use/ns-test-2/my-topic")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop/ns-test-2/my-topic")
                 .create();
 
-        assertTrue(admin.namespaces().getNamespaces("prop", "use").contains("prop/use/ns-test-2"));
+        assertTrue(admin.namespaces().getNamespaces("prop").contains("prop/ns-test-2"));
 
-        admin.namespaces().unload("prop/use/ns-test-2");
+        admin.namespaces().unload("prop/ns-test-2");
 
         producer.close();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index d58c690fb..b2b09982c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -95,9 +95,12 @@ void setup() throws Exception {
             admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
             admin.properties().createProperty("prop",
                     new PropertyAdmin(Sets.newHashSet("appid1"), Sets.newHashSet("usc")));
-            admin.namespaces().createNamespace("prop/usc/ns-quota");
-            admin.namespaces().createNamespace("prop/usc/quotahold");
-            admin.namespaces().createNamespace("prop/usc/quotaholdasync");
+            admin.namespaces().createNamespace("prop/ns-quota");
+            admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota", Sets.newHashSet("usc"));
+            admin.namespaces().createNamespace("prop/quotahold");
+            admin.namespaces().setNamespaceReplicationClusters("prop/quotahold", Sets.newHashSet("usc"));
+            admin.namespaces().createNamespace("prop/quotaholdasync");
+            admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync", Sets.newHashSet("usc"));
         } catch (Throwable t) {
             LOG.error("Error setting up broker test", t);
             Assert.fail("Broker test setup failed");
@@ -122,13 +125,13 @@ private void rolloverStats() {
 
     @Test
     public void testConsumerBacklogEviction() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/usc/ns-quota"), Maps.newTreeMap());
-        admin.namespaces().setBacklogQuota("prop/usc/ns-quota",
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
         PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
                 .build();
 
-        final String topic1 = "persistent://prop/usc/ns-quota/topic1";
+        final String topic1 = "persistent://prop/ns-quota/topic1";
         final String subName1 = "c1";
         final String subName2 = "c2";
         final int numMsgs = 20;
@@ -153,12 +156,12 @@ public void testConsumerBacklogEviction() throws Exception {
 
     @Test
     public void testConsumerBacklogEvictionWithAck() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/usc/ns-quota"), Maps.newTreeMap());
-        admin.namespaces().setBacklogQuota("prop/usc/ns-quota",
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
         PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
 
-        final String topic1 = "persistent://prop/usc/ns-quota/topic11";
+        final String topic1 = "persistent://prop/ns-quota/topic11";
         final String subName1 = "c11";
         final String subName2 = "c21";
         final int numMsgs = 20;
@@ -184,11 +187,11 @@ public void testConsumerBacklogEvictionWithAck() throws Exception {
 
     @Test
     public void testConcurrentAckAndEviction() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/usc/ns-quota"), Maps.newTreeMap());
-        admin.namespaces().setBacklogQuota("prop/usc/ns-quota",
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
 
-        final String topic1 = "persistent://prop/usc/ns-quota/topic12";
+        final String topic1 = "persistent://prop/ns-quota/topic12";
         final String subName1 = "c12";
         final String subName2 = "c22";
         final int numMsgs = 20;
@@ -256,11 +259,11 @@ public void run() {
 
     @Test
     public void testNoEviction() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/usc/ns-quota"), Maps.newTreeMap());
-        admin.namespaces().setBacklogQuota("prop/usc/ns-quota",
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
 
-        final String topic1 = "persistent://prop/usc/ns-quota/topic13";
+        final String topic1 = "persistent://prop/ns-quota/topic13";
         final String subName1 = "c13";
         final String subName2 = "c23";
         final int numMsgs = 10;
@@ -321,11 +324,11 @@ public void run() {
 
     @Test
     public void testEvictionMulti() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/usc/ns-quota"), Maps.newTreeMap());
-        admin.namespaces().setBacklogQuota("prop/usc/ns-quota",
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newTreeMap());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
                 new BacklogQuota(15 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
 
-        final String topic1 = "persistent://prop/usc/ns-quota/topic14";
+        final String topic1 = "persistent://prop/ns-quota/topic14";
         final String subName1 = "c14";
         final String subName2 = "c24";
         final int numMsgs = 10;
@@ -429,12 +432,12 @@ public void run() {
 
     @Test
     public void testAheadProducerOnHold() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/usc/quotahold"), Maps.newTreeMap());
-        admin.namespaces().setBacklogQuota("prop/usc/quotahold",
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+        admin.namespaces().setBacklogQuota("prop/quotahold",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
                 .statsInterval(0, TimeUnit.SECONDS).build();
-        final String topic1 = "persistent://prop/usc/quotahold/hold";
+        final String topic1 = "persistent://prop/quotahold/hold";
         final String subName1 = "c1hold";
         final int numMsgs = 10;
 
@@ -467,12 +470,12 @@ public void testAheadProducerOnHold() throws Exception {
 
     @Test
     public void testAheadProducerOnHoldTimeout() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/usc/quotahold"), Maps.newTreeMap());
-        admin.namespaces().setBacklogQuota("prop/usc/quotahold",
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+        admin.namespaces().setBacklogQuota("prop/quotahold",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
                 .statsInterval(0, TimeUnit.SECONDS).build();
-        final String topic1 = "persistent://prop/usc/quotahold/holdtimeout";
+        final String topic1 = "persistent://prop/quotahold/holdtimeout";
         final String subName1 = "c1holdtimeout";
         boolean gotException = false;
 
@@ -501,12 +504,12 @@ public void testAheadProducerOnHoldTimeout() throws Exception {
 
     @Test
     public void testProducerException() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/usc/quotahold"), Maps.newTreeMap());
-        admin.namespaces().setBacklogQuota("prop/usc/quotahold",
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+        admin.namespaces().setBacklogQuota("prop/quotahold",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
                 .statsInterval(0, TimeUnit.SECONDS).build();
-        final String topic1 = "persistent://prop/usc/quotahold/except";
+        final String topic1 = "persistent://prop/quotahold/except";
         final String subName1 = "c1except";
         boolean gotException = false;
 
@@ -537,12 +540,12 @@ public void testProducerException() throws Exception {
 
     @Test
     public void testProducerExceptionAndThenUnblock() throws Exception {
-        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/usc/quotahold"), Maps.newTreeMap());
-        admin.namespaces().setBacklogQuota("prop/usc/quotahold",
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newTreeMap());
+        admin.namespaces().setBacklogQuota("prop/quotahold",
                 new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
                 .statsInterval(0, TimeUnit.SECONDS).build();
-        final String topic1 = "persistent://prop/usc/quotahold/exceptandunblock";
+        final String topic1 = "persistent://prop/quotahold/exceptandunblock";
         final String subName1 = "c1except";
         boolean gotException = false;
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 5a9642253..5a60ff3f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -77,7 +77,7 @@ protected void cleanup() throws Exception {
     public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType) throws Exception {
         int numMsgs = 50;
         int numMsgsInBatch = numMsgs / 2;
-        final String topicName = "persistent://prop/use/ns-abc/testSimpleBatchProducerWithFixedBatchSize";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize";
         final String subscriptionName = "sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -120,7 +120,7 @@ public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressio
     @Test(dataProvider = "codec")
     public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressionType) throws Exception {
         int numMsgs = 100;
-        final String topicName = "persistent://prop/use/ns-abc/testSimpleBatchProducerWithFixedBatchTime";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchTime";
         final String subscriptionName = "time-sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -155,7 +155,7 @@ public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressio
     @Test(dataProvider = "codec")
     public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType compressionType) throws Exception {
         int numMsgs = 100;
-        final String topicName = "persistent://prop/use/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime";
         final String subscriptionName = "time-size-sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -192,7 +192,7 @@ public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType com
     public void testBatchProducerWithLargeMessage(CompressionType compressionType) throws Exception {
         int numMsgs = 50;
         int numMsgsInBatch = numMsgs / 2;
-        final String topicName = "persistent://prop/use/finance/testBatchProducerWithLargeMessage";
+        final String topicName = "persistent://prop/ns-abc/testBatchProducerWithLargeMessage";
         final String subscriptionName = "large-message-sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -247,7 +247,7 @@ public void testBatchProducerWithLargeMessage(CompressionType compressionType) t
     public void testSimpleBatchProducerConsumer(CompressionType compressionType) throws Exception {
         int numMsgs = 500;
         int numMsgsInBatch = numMsgs / 20;
-        final String topicName = "persistent://prop/use/ns-abc/testSimpleBatchProducerConsumer";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer";
         final String subscriptionName = "pc-sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -296,7 +296,7 @@ public void testSimpleBatchProducerConsumer(CompressionType compressionType) thr
     public void testSimpleBatchSyncProducerWithFixedBatchSize() throws Exception {
         int numMsgs = 10;
         int numMsgsInBatch = numMsgs / 2;
-        final String topicName = "persistent://prop/use/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize";
         final String subscriptionName = "syncsub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -339,7 +339,7 @@ public void testSimpleBatchSyncProducerWithFixedBatchSize() throws Exception {
     public void testSimpleBatchProducerConsumer1kMessages() throws Exception {
         int numMsgs = 2000;
         int numMsgsInBatch = 4;
-        final String topicName = "persistent://prop/use/ns-abc/testSimpleBatchProducerConsumer1kMessages";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer1kMessages";
         final String subscriptionName = "pc1k-sub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -403,7 +403,7 @@ public void testSimpleBatchProducerConsumer1kMessages() throws Exception {
     public void testOutOfOrderAcksForBatchMessage() throws Exception {
         int numMsgs = 40;
         int numMsgsInBatch = numMsgs / 4;
-        final String topicName = "persistent://prop/use/ns-abc/testOutOfOrderAcksForBatchMessage";
+        final String topicName = "persistent://prop/ns-abc/testOutOfOrderAcksForBatchMessage";
         final String subscriptionName = "oooack-sub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -469,7 +469,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception {
     public void testNonBatchCumulativeAckAfterBatchPublish() throws Exception {
         int numMsgs = 10;
         int numMsgsInBatch = numMsgs;
-        final String topicName = "persistent://prop/use/ns-abc/testNonBatchCumulativeAckAfterBatchPublish";
+        final String topicName = "persistent://prop/ns-abc/testNonBatchCumulativeAckAfterBatchPublish";
         final String subscriptionName = "nbcaabp-sub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -523,7 +523,7 @@ public void testNonBatchCumulativeAckAfterBatchPublish() throws Exception {
     public void testBatchAndNonBatchCumulativeAcks() throws Exception {
         int numMsgs = 50;
         int numMsgsInBatch = numMsgs / 10;
-        final String topicName = "persistent://prop/use/ns-abc/testBatchAndNonBatchCumulativeAcks";
+        final String topicName = "persistent://prop/ns-abc/testBatchAndNonBatchCumulativeAcks";
         final String subscriptionName = "bnb-sub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -586,7 +586,7 @@ public void testBatchAndNonBatchCumulativeAcks() throws Exception {
     @Test(timeOut = 3000)
     public void testConcurrentBatchMessageAck() throws Exception {
         int numMsgs = 10;
-        final String topicName = "persistent://prop/use/ns-abc/testConcurrentAck";
+        final String topicName = "persistent://prop/ns-abc/testConcurrentAck";
         final String subscriptionName = "sub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 13e605702..c375635b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -72,6 +72,7 @@
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 
@@ -100,7 +101,7 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testOwnedNsCheck() throws Exception {
-        final String topic = "persistent://prop/use/ns-abc/successTopic";
+        final String topic = "persistent://prop/ns-abc/successTopic";
         BrokerService service = pulsar.getBrokerService();
 
         final CountDownLatch latch1 = new CountDownLatch(1);
@@ -134,7 +135,7 @@ public void testOwnedNsCheck() throws Exception {
 
     @Test
     public void testBrokerServicePersistentTopicStats() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/successTopic";
+        final String topicName = "persistent://prop/ns-abc/successTopic";
         final String subName = "successSub";
 
         PersistentTopicStats stats;
@@ -211,7 +212,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
 
     @Test
     public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/successSharedTopic";
+        final String topicName = "persistent://prop/ns-abc/successSharedTopic";
         final String subName = "successSharedSub";
 
         PersistentTopicStats stats;
@@ -300,7 +301,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
 
     @Test
     public void testBrokerStatsMetrics() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/newTopic";
+        final String topicName = "persistent://prop/ns-abc/newTopic";
         final String subName = "newSub";
         BrokerStats brokerStatsClient = admin.brokerStats();
 
@@ -323,7 +324,6 @@ public void testBrokerStatsMetrics() throws Exception {
         consumer.close();
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
         JsonArray metrics = brokerStatsClient.getMetrics();
-        assertEquals(metrics.size(), 5, metrics.toString());
 
         // these metrics seem to be arriving in different order at different times...
         // is the order really relevant here?
@@ -332,10 +332,10 @@ public void testBrokerStatsMetrics() throws Exception {
         for (int i = 0; i < metrics.size(); i++) {
             try {
                 String data = metrics.get(i).getAsJsonObject().get("dimensions").toString();
-                if (!namespaceDimensionFound && data.contains("prop/use/ns-abc")) {
+                if (!namespaceDimensionFound && data.contains("prop/ns-abc")) {
                     namespaceDimensionFound = true;
                 }
-                if (!topicLoadTimesDimensionFound && data.contains("prop/use/ns-abc")) {
+                if (!topicLoadTimesDimensionFound && data.contains("prop/ns-abc")) {
                     topicLoadTimesDimensionFound = true;
                 }
             } catch (Exception e) {
@@ -350,8 +350,8 @@ public void testBrokerStatsMetrics() throws Exception {
     @Test
     public void testBrokerServiceNamespaceStats() throws Exception {
         final int numBundles = 4;
-        final String ns1 = "prop/use/stats1";
-        final String ns2 = "prop/use/stats2";
+        final String ns1 = "prop/stats1";
+        final String ns2 = "prop/stats2";
 
         List<String> nsList = Lists.newArrayList(ns1, ns2);
         List<Producer<byte[]>> producerList = Lists.newArrayList();
@@ -360,6 +360,7 @@ public void testBrokerServiceNamespaceStats() throws Exception {
 
         for (String ns : nsList) {
             admin.namespaces().createNamespace(ns, numBundles);
+            admin.namespaces().setNamespaceReplicationClusters(ns, Sets.newHashSet("test"));
             String topic1 = String.format("persistent://%s/topic1", ns);
             producerList.add(pulsarClient.newProducer().topic(topic1).create());
             String topic2 = String.format("persistent://%s/topic2", ns);
@@ -402,7 +403,7 @@ public void testBrokerServiceNamespaceStats() throws Exception {
 
     @Test
     public void testTlsDisabled() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/newTopic";
+        final String topicName = "persistent://prop/ns-abc/newTopic";
         final String subName = "newSub";
         PulsarClient pulsarClient = null;
 
@@ -442,7 +443,7 @@ public void testTlsDisabled() throws Exception {
 
     @Test
     public void testTlsEnabled() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/newTopic";
+        final String topicName = "persistent://prop/ns-abc/newTopic";
         final String subName = "newSub";
 
         conf.setAuthenticationEnabled(false);
@@ -515,7 +516,7 @@ public void testTlsEnabled() throws Exception {
     @SuppressWarnings("deprecation")
     @Test
     public void testTlsAuthAllowInsecure() throws Exception {
-        final String topicName = "persistent://prop/usw/my-ns/newTopic";
+        final String topicName = "persistent://prop/ns-abc/newTopic";
         final String subName = "newSub";
         Authentication auth;
 
@@ -574,7 +575,7 @@ public void testTlsAuthAllowInsecure() throws Exception {
     @SuppressWarnings("deprecation")
     @Test
     public void testTlsAuthDisallowInsecure() throws Exception {
-        final String topicName = "persistent://prop/usw/my-ns/newTopic";
+        final String topicName = "persistent://prop/my-ns/newTopic";
         final String subName = "newSub";
         Authentication auth;
 
@@ -632,7 +633,7 @@ public void testTlsAuthDisallowInsecure() throws Exception {
     @SuppressWarnings("deprecation")
     @Test
     public void testTlsAuthUseTrustCert() throws Exception {
-        final String topicName = "persistent://prop/usw/my-ns/newTopic";
+        final String topicName = "persistent://prop/ns-abc/newTopic";
         final String subName = "newSub";
         Authentication auth;
 
@@ -693,7 +694,7 @@ public void testTlsAuthUseTrustCert() throws Exception {
      */
     @Test
     public void testLookupThrottlingForClientByClient() throws Exception {
-        final String topicName = "persistent://prop/usw/my-ns/newTopic";
+        final String topicName = "persistent://prop/my-ns/newTopic";
 
         String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
@@ -709,8 +710,9 @@ public void testLookupThrottlingForClientByClient() throws Exception {
 
     @Test
     public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
-        final String namespace = "prop/use/disableBundle";
+        final String namespace = "prop/disableBundle";
         admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"));
 
         // own namespace bundle
         final String topicName = "persistent://" + namespace + "/my-topic";
@@ -743,7 +745,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
      */
     @Test(timeOut = 3000)
     public void testTopicFailureShouldNotHaveDeadLock() {
-        final String namespace = "prop/usw/my-ns";
+        final String namespace = "prop/ns-abc";
         final String deadLockTestTopic = "persistent://" + namespace + "/deadLockTestTopic";
 
         // let this broker own this namespace bundle by creating a topic
@@ -786,7 +788,7 @@ public void testTopicFailureShouldNotHaveDeadLock() {
 
     @Test
     public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
-        final String namespace = "prop/usw/my-ns";
+        final String namespace = "prop/ns-abc";
         final String deadLockTestTopic = "persistent://" + namespace + "/deadLockTestTopic";
 
         // let this broker own this namespace bundle by creating a topic
@@ -844,9 +846,11 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
      */
     @Test
     public void testCreateNamespacePolicy() throws Exception {
-        final String namespace = "prop/use/testPolicy";
+        final String namespace = "prop/testPolicy";
         final int totalBundle = 3;
+        System.err.println("----------------");
         admin.namespaces().createNamespace(namespace, new BundlesData(totalBundle));
+
         String globalPath = joinPath(LOCAL_POLICIES_ROOT, namespace);
         pulsar.getLocalZkCacheService().policiesCache().clear();
         Optional<LocalPolicies> policy = pulsar.getLocalZkCacheService().policiesCache().get(globalPath);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
index fd078fc0c..3655bbc8b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
@@ -38,10 +38,11 @@ protected PulsarService getPulsar() {
 
     public void baseSetup() throws Exception {
         super.internalSetup();
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString()));
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
         admin.properties().createProperty("prop",
-                new PropertyAdmin(Sets.newHashSet("appid1"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("prop/use/ns-abc");
+                new PropertyAdmin(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("prop/ns-abc");
+        admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc", Sets.newHashSet("test"));
     }
 
     void rolloverPerIntervalStats() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index e03c240d3..c04e3bd10 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -25,6 +25,8 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Sets;
+
 import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
@@ -90,7 +92,7 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testSimpleProducerEvents() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic0";
+        final String topicName = "persistent://prop/ns-abc/topic0";
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -117,7 +119,7 @@ public void testSimpleProducerEvents() throws Exception {
 
     @Test
     public void testSimpleConsumerEvents() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic1";
+        final String topicName = "persistent://prop/ns-abc/topic1";
         final String subName = "sub1";
         final int numMsgs = 10;
 
@@ -200,7 +202,7 @@ public void testSimpleConsumerEvents() throws Exception {
 
     @Test
     public void testConsumerFlowControl() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic2";
+        final String topicName = "persistent://prop/ns-abc/topic2";
         final String subName = "sub2";
 
         Message<byte[]> msg;
@@ -243,7 +245,7 @@ public void testConsumerFlowControl() throws Exception {
      */
     @Test
     public void testActiveSubscriptionWithCache() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic2";
+        final String topicName = "persistent://prop/ns-abc/topic2";
         final String subName = "sub2";
 
         Message<byte[]> msg;
@@ -297,7 +299,7 @@ public void testActiveSubscriptionWithCache() throws Exception {
     @Test(enabled = false)
     public void testConcurrentConsumerThreads() throws Exception {
         // test concurrent consumer threads on same consumerId
-        final String topicName = "persistent://prop/use/ns-abc/topic3";
+        final String topicName = "persistent://prop/ns-abc/topic3";
         final String subName = "sub3";
 
         final int recvQueueSize = 100;
@@ -347,7 +349,7 @@ public Void call() throws Exception {
     @Test(enabled = false)
     // TODO: enable this after java client supports graceful close
     public void testGracefulClose() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic4";
+        final String topicName = "persistent://prop/ns-abc/topic4";
         final String subName = "sub4";
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -409,7 +411,7 @@ public void testGracefulClose() throws Exception {
 
     @Test
     public void testSimpleCloseTopic() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic5";
+        final String topicName = "persistent://prop/ns-abc/topic5";
         final String subName = "sub5";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
@@ -437,7 +439,7 @@ public void testSimpleCloseTopic() throws Exception {
 
     @Test
     public void testSingleClientMultipleSubscriptions() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic6";
+        final String topicName = "persistent://prop/ns-abc/topic6";
         final String subName = "sub6";
 
         pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
@@ -452,7 +454,7 @@ public void testSingleClientMultipleSubscriptions() throws Exception {
 
     @Test
     public void testMultipleClientsMultipleSubscriptions() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic7";
+        final String topicName = "persistent://prop/ns-abc/topic7";
         final String subName = "sub7";
 
         PulsarClient client1 = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
@@ -476,7 +478,7 @@ public void testMultipleClientsMultipleSubscriptions() throws Exception {
 
     @Test
     public void testTopicDeleteWithDisconnectedSubscription() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic8";
+        final String topicName = "persistent://prop/ns-abc/topic8";
         final String subName = "sub1";
 
         // 1. client connect
@@ -508,7 +510,7 @@ int getAvailablePermits(PersistentSubscription sub) {
 
     @Test(enabled = false)
     public void testUnloadNamespace() throws Exception {
-        String topic = "persistent://prop/use/ns-abc/topic-9";
+        String topic = "persistent://prop/ns-abc/topic-9";
         TopicName topicName = TopicName.get(topic);
         pulsarClient.newProducer().topic(topic).create();
         pulsarClient.close();
@@ -517,7 +519,7 @@ public void testUnloadNamespace() throws Exception {
         assertTrue(((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()).getManagedLedgers()
                 .containsKey(topicName.getPersistenceNamingEncoding()));
 
-        admin.namespaces().unload("prop/use/ns-abc");
+        admin.namespaces().unload("prop/ns-abc");
 
         int i = 0;
         for (i = 0; i < 30; i++) {
@@ -538,7 +540,7 @@ public void testUnloadNamespace() throws Exception {
     @Test
     public void testGC() throws Exception {
         // 1. Simple successful GC
-        String topicName = "persistent://prop/use/ns-abc/topic-10";
+        String topicName = "persistent://prop/ns-abc/topic-10";
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         producer.close();
 
@@ -574,10 +576,10 @@ public void testGC() throws Exception {
     public void testGcAndRetentionPolicy() throws Exception {
 
         // Retain data for at-least 10min
-        admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(10, 10));
+        admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(10, 10));
 
         // 1. Simple successful GC
-        String topicName = "persistent://prop/use/ns-abc/topic-10";
+        String topicName = "persistent://prop/ns-abc/topic-10";
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         producer.close();
 
@@ -587,7 +589,7 @@ public void testGcAndRetentionPolicy() throws Exception {
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         // Remove retention
-        admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(0, 10));
+        admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(0, 10));
         Thread.sleep(300);
 
         // 2. Topic is not GCed with live connection
@@ -617,10 +619,10 @@ public void testGcAndRetentionPolicy() throws Exception {
     @Test
     public void testInfiniteRetentionPolicy() throws Exception {
         // Retain data forever
-        admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(-1, -1));
+        admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(-1, -1));
 
         // 1. Simple successful GC
-        String topicName = "persistent://prop/use/ns-abc/topic-10";
+        String topicName = "persistent://prop/ns-abc/topic-10";
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         producer.close();
 
@@ -630,7 +632,7 @@ public void testInfiniteRetentionPolicy() throws Exception {
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         // Remove retention
-        admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(0, 10));
+        admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(0, 10));
         Thread.sleep(300);
 
         // 2. Topic is not GCed with live connection
@@ -656,12 +658,13 @@ public void testInfiniteRetentionPolicy() throws Exception {
     @Test
     public void testMessageExpiry() throws Exception {
         int messageTTLSecs = 1;
-        String namespaceName = "prop/use/expiry-check";
+        String namespaceName = "prop/expiry-check";
 
         admin.namespaces().createNamespace(namespaceName);
+        admin.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet("test"));
         admin.namespaces().setNamespaceMessageTTL(namespaceName, messageTTLSecs);
 
-        final String topicName = "persistent://prop/use/expiry-check/topic1";
+        final String topicName = "persistent://prop/expiry-check/topic1";
         final String subName = "sub1";
         final int numMsgs = 10;
 
@@ -699,12 +702,13 @@ public void testMessageExpiry() throws Exception {
     @Test
     public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
         int messageTTLSecs = 10;
-        String namespaceName = "prop/use/expiry-check-1";
+        String namespaceName = "prop/expiry-check-1";
 
         admin.namespaces().createNamespace(namespaceName);
+        admin.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet("test"));
         admin.namespaces().setNamespaceMessageTTL(namespaceName, messageTTLSecs);
 
-        final String topicName = "persistent://prop/use/expiry-check-1/topic1";
+        final String topicName = "persistent://prop/expiry-check-1/topic1";
         final String subName = "sub1";
         final int numMsgs = 10;
 
@@ -737,7 +741,7 @@ public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
 
     @Test
     public void testSubscriptionTypeTransitions() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/shared-topic2";
+        final String topicName = "persistent://prop/ns-abc/shared-topic2";
         final String subName = "sub2";
 
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
@@ -811,7 +815,7 @@ public void testSubscriptionTypeTransitions() throws Exception {
 
     @Test
     public void testReceiveWithTimeout() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic-receive-timeout";
+        final String topicName = "persistent://prop/ns-abc/topic-receive-timeout";
         final String subName = "sub";
 
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
@@ -840,7 +844,7 @@ public void testReceiveWithTimeout() throws Exception {
 
     @Test
     public void testProducerReturnedMessageId() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic-xyz";
+        final String topicName = "persistent://prop/ns-abc/topic-xyz";
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -886,7 +890,7 @@ public void testProducerReturnedMessageId() throws Exception {
 
     @Test
     public void testProducerQueueFullBlocking() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic-xyzx";
+        final String topicName = "persistent://prop/ns-abc/topic-xyzx";
         final int messages = 10;
 
         PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
@@ -928,7 +932,7 @@ public void testProducerQueueFullBlocking() throws Exception {
 
     @Test
     public void testProducerQueueFullNonBlocking() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic-xyzx";
+        final String topicName = "persistent://prop/ns-abc/topic-xyzx";
         final int messages = 10;
 
         // 1. Producer connect
@@ -976,30 +980,30 @@ public void testDeleteTopics() throws Exception {
         BrokerService brokerService = pulsar.getBrokerService();
 
         // 1. producers connect
-        Producer<byte[]> producer1 = pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic-1").create();
-        /* Producer<byte[]> producer2 = */ pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic-2")
+        Producer<byte[]> producer1 = pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-1").create();
+        /* Producer<byte[]> producer2 = */ pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-2")
                 .create();
 
         brokerService.updateRates();
 
         Map<String, NamespaceBundleStats> bundleStatsMap = brokerService.getBundleStats();
         assertEquals(bundleStatsMap.size(), 1);
-        NamespaceBundleStats bundleStats = bundleStatsMap.get("prop/use/ns-abc/0x00000000_0xffffffff");
+        NamespaceBundleStats bundleStats = bundleStatsMap.get("prop/ns-abc/0x00000000_0xffffffff");
         assertNotNull(bundleStats);
 
         producer1.close();
-        admin.persistentTopics().delete("persistent://prop/use/ns-abc/topic-1");
+        admin.persistentTopics().delete("persistent://prop/ns-abc/topic-1");
 
         brokerService.updateRates();
 
         bundleStatsMap = brokerService.getBundleStats();
         assertEquals(bundleStatsMap.size(), 1);
-        bundleStats = bundleStatsMap.get("prop/use/ns-abc/0x00000000_0xffffffff");
+        bundleStats = bundleStatsMap.get("prop/ns-abc/0x00000000_0xffffffff");
         assertNotNull(bundleStats);
 
         // // Delete 2nd topic as well
         // producer2.close();
-        // admin.persistentTopics().delete("persistent://prop/use/ns-abc/topic-2");
+        // admin.persistentTopics().delete("persistent://prop/ns-abc/topic-2");
         //
         // brokerService.updateRates();
         //
@@ -1014,7 +1018,7 @@ public void testDeleteTopics() throws Exception {
 
     @Test(dataProvider = "codec")
     public void testCompression(CompressionType compressionType) throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic0" + compressionType;
+        final String topicName = "persistent://prop/ns-abc/topic0" + compressionType;
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
@@ -1052,7 +1056,7 @@ public void testBrokerTopicStats() throws Exception {
         // disable statsUpdate to calculate rates explicitly
         statsUpdater.shutdown();
 
-        final String namespace = "prop/use/ns-abc";
+        final String namespace = "prop/ns-abc";
         Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "/topic0").create();
         // 1. producer publish messages
         for (int i = 0; i < 10; i++) {
@@ -1079,7 +1083,7 @@ public void testBrokerTopicStats() throws Exception {
 
     @Test
     public void testPayloadCorruptionDetection() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic1";
+        final String topicName = "persistent://prop/ns-abc/topic1";
 
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -1129,7 +1133,7 @@ public void testPayloadCorruptionDetection() throws Exception {
      */
     @Test()
     public void testMessageRedelivery() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic2";
+        final String topicName = "persistent://prop/ns-abc/topic2";
         final String subName = "sub2";
 
         Message<byte[]> msg;
@@ -1183,7 +1187,7 @@ public void testMessageRedelivery() throws Exception {
     @Test
     public void testMessageReplay() throws Exception {
 
-        final String topicName = "persistent://prop/use/ns-abc/topic2";
+        final String topicName = "persistent://prop/ns-abc/topic2";
         final String subName = "sub2";
 
         Message<byte[]> msg;
@@ -1252,7 +1256,7 @@ public void testMessageReplay() throws Exception {
 
     @Test
     public void testCreateProducerWithSameName() throws Exception {
-        String topic = "persistent://prop/use/ns-abc/testCreateProducerWithSameName";
+        String topic = "persistent://prop/ns-abc/testCreateProducerWithSameName";
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic)
                 .producerName("test-producer-a");
@@ -1275,7 +1279,7 @@ public void testCreateProducerWithSameName() throws Exception {
 
     @Test
     public void testGetOrCreateTopic() throws Exception {
-        String topicName = "persistent://prop/use/ns-abc/testGetOrCreateTopic";
+        String topicName = "persistent://prop/ns-abc/testGetOrCreateTopic";
 
         admin.lookups().lookupTopic(topicName);
         Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -1287,7 +1291,7 @@ public void testGetOrCreateTopic() throws Exception {
 
     @Test
     public void testGetTopicIfExists() throws Exception {
-        String topicName = "persistent://prop/use/ns-abc/testGetTopicIfExists";
+        String topicName = "persistent://prop/ns-abc/testGetTopicIfExists";
         admin.lookups().lookupTopic(topicName);
         Optional<Topic> topic = pulsar.getBrokerService().getTopicIfExists(topicName).join();
         assertFalse(topic.isPresent());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index 696a33181..549d2b79c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -61,7 +61,7 @@ protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
-    private final String topicName = "persistent://prop/use/ns-abc/topic0";
+    private final String topicName = "persistent://prop/ns-abc/topic0";
 
     @Test
     public void testSimpleTermination() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 219e70300..3fff251b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -93,7 +93,7 @@ protected void setup() throws Exception {
         System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH);
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("use");
+        conf.setClusterName("test");
 
         super.init();
     }
@@ -126,10 +126,10 @@ protected void cleanup() throws Exception {
     }
 
     public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
                 .subscriptionName("my-subscriber-name").subscribe();
 
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic");
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
 
         if (batchMessageDelayMs != 0) {
             producerBuilder.enableBatching(true);
@@ -169,8 +169,8 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
         internalSetup(authTls);
 
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
 
         testSyncProducerAndConsumer(batchMessageDelayMs);
 
@@ -185,8 +185,8 @@ public void testBasicCryptSyncProducerAndConsumer(int batchMessageDelayMs) throw
         internalSetup(authPassword);
 
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet(), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Sets.newHashSet(), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
 
         testSyncProducerAndConsumer(batchMessageDelayMs);
 
@@ -201,8 +201,8 @@ public void testBasicArp1SyncProducerAndConsumer(int batchMessageDelayMs) throws
         internalSetup(authPassword);
 
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet(), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Sets.newHashSet(), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
 
         testSyncProducerAndConsumer(batchMessageDelayMs);
 
@@ -220,16 +220,16 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet("anonymousUser"), Sets.newHashSet("use")));
+                new PropertyAdmin(Sets.newHashSet("anonymousUser"), Sets.newHashSet("test")));
 
         // make a PulsarAdmin instance as "anonymousUser" for http request
         admin.close();
         admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build());
-        admin.namespaces().createNamespace("my-property/use/my-ns");
-        admin.persistentTopics().grantPermission("persistent://my-property/use/my-ns/my-topic", "anonymousUser",
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+        admin.persistentTopics().grantPermission("persistent://my-property/my-ns/my-topic", "anonymousUser",
                 EnumSet.allOf(AuthAction.class));
 
         // setup the client
@@ -240,7 +240,7 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws
         // unauthorized topic test
         Exception pulsarClientException = null;
         try {
-            pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/other-topic")
+            pulsarClient.newConsumer().topic("persistent://my-property/my-ns/other-topic")
                     .subscriptionName("my-subscriber-name").subscribe();
         } catch (Exception e) {
             pulsarClientException = e;
@@ -268,7 +268,7 @@ public void testAuthenticationFilterNegative() throws Exception {
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        final String cluster = "use";
+        final String cluster = "test";
         final ClusterData clusterData = new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS);
         // this will cause NPE and it should throw 500
@@ -299,12 +299,12 @@ public void testInternalServerExceptionOnLookup() throws Exception {
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
-        String namespace = "my-property/use/my-ns";
-        admin.namespaces().createNamespace(namespace);
+                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        String namespace = "my-property/my-ns";
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
 
         String topic = "persistent://" + namespace + "1/topic1";
         // this will cause NPE and it should throw 500
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index 13d5ad299..3d331f0d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -91,7 +91,7 @@ protected void setup() throws Exception {
         System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH);
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("use");
+        conf.setClusterName("test");
 
         super.init();
 
@@ -117,8 +117,8 @@ protected void setupClient() throws Exception {
                 .build();
 
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
     }
 
     @AfterMethod
@@ -161,7 +161,7 @@ public void testTlsSyncProducerAndConsumerWithInvalidBrokerHost(boolean hostname
         setup();
 
         try {
-            pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic")
+            pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
                     .subscriptionName("my-subscriber-name").subscribe();
             if (hostnameVerificationEnabled) {
                 Assert.fail("Connection should be failed due to hostnameVerification enabled");
@@ -197,10 +197,10 @@ public void testTlsSyncProducerAndConsumerCorrectBrokerHost() throws Exception {
 
         setup();
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
                 .subscriptionName("my-subscriber-name").subscribe();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 605bb3214..be04c2cf9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -71,7 +71,7 @@ protected void setup() throws Exception {
         providers.add(TestAuthenticationProvider.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("use");
+        conf.setClusterName("test");
 
         super.init();
     }
@@ -115,27 +115,27 @@ public void testProducerAndConsumerAuthorization() throws Exception {
                 .authentication(authenticationInvalidRole).build();
 
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
 
         // (1) Valid Producer and consumer creation
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic")
                 .create();
         consumer.close();
         producer.close();
 
         // (2) InValid user auth-role will be rejected by authorization service
         try {
-            consumer = pulsarClientInvalidRole.newConsumer().topic("persistent://my-property/use/my-ns/my-topic")
+            consumer = pulsarClientInvalidRole.newConsumer().topic("persistent://my-property/my-ns/my-topic")
                     .subscriptionName("my-subscriber-name").subscribe();
             Assert.fail("should have failed with authorization error");
         } catch (PulsarClientException.AuthorizationException pa) {
             // Ok
         }
         try {
-            producer = pulsarClientInvalidRole.newProducer().topic("persistent://my-property/use/my-ns/my-topic")
+            producer = pulsarClientInvalidRole.newProducer().topic("persistent://my-property/my-ns/my-topic")
                     .create();
             Assert.fail("should have failed with authorization error");
         } catch (PulsarClientException.AuthorizationException pa) {
@@ -164,17 +164,17 @@ public void testSubscriptionPrefixAuthorization() throws Exception {
         pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication).build();
 
         admin.properties().createProperty("prop-prefix",
-                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("prop-prefix/use/ns");
+                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("prop-prefix/ns", Sets.newHashSet("test"));
 
         // (1) Valid subscription name will be approved by authorization service
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop-prefix/use/ns/t1")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop-prefix/ns/t1")
                 .subscriptionName(clientRole + "-sub1").subscribe();
         consumer.close();
 
         // (2) InValid subscription name will be rejected by authorization service
         try {
-            consumer = pulsarClient.newConsumer().topic("persistent://prop-prefix/use/ns/t1").subscriptionName("sub1")
+            consumer = pulsarClient.newConsumer().topic("persistent://prop-prefix/ns/t1").subscriptionName("sub1")
                     .subscribe();
             Assert.fail("should have failed with authorization error");
         } catch (PulsarClientException.AuthorizationException pa) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index d5759f04a..9b6281935 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -174,9 +174,9 @@ public void testMultipleBrokerLookup() throws Exception {
         PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build();
 
         // load namespace-bundle by calling Broker2
-        Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Consumer<byte[]> consumer = pulsarClient2.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
                 .create();
 
         for (int i = 0; i < 10; i++) {
@@ -304,7 +304,7 @@ public void testPartitionTopicLookup() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int numPartitions = 8;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/my-partitionedtopic1");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1");
 
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
 
@@ -429,7 +429,7 @@ public void testWebserviceServiceTls() throws Exception {
         URI brokerServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort());
         PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build();
 
-        final String lookupResourceUrl = "/lookup/v2/destination/persistent/my-property/use/my-ns/my-topic1";
+        final String lookupResourceUrl = "/lookup/v2/topic/persistent/my-property/my-ns/my-topic1";
 
         // set client cert_key file
         KeyManager[] keyManagers = null;
@@ -797,7 +797,7 @@ public void testSplitUnloadLookupTest() throws Exception {
 
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/my-ns";
+        final String namespace = "my-property/my-ns";
         // (1) Start broker-1
         ServiceConfiguration conf2 = new ServiceConfiguration();
         conf2.setAdvertisedAddress("localhost");
@@ -902,7 +902,7 @@ public void testModularLoadManagerSplitBundle() throws Exception {
         final String loadBalancerName = conf.getLoadManagerClassName();
 
         try {
-            final String namespace = "my-property/use/my-ns";
+            final String namespace = "my-property/my-ns";
             // (1) Start broker-1
             ServiceConfiguration conf2 = new ServiceConfiguration();
             conf2.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index de8c24c7d..8b996d748 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -43,8 +43,8 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testProducerSequenceAfterReconnect() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/testProducerSequenceAfterReconnect";
-        admin.namespaces().setDeduplicationStatus("my-property/use/my-ns", true);
+        String topic = "persistent://my-property/my-ns/testProducerSequenceAfterReconnect";
+        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic)
                 .producerName("my-producer-name");
@@ -74,8 +74,8 @@ public void testProducerSequenceAfterReconnect() throws Exception {
 
     @Test
     public void testProducerSequenceAfterRestart() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/testProducerSequenceAfterRestart";
-        admin.namespaces().setDeduplicationStatus("my-property/use/my-ns", true);
+        String topic = "persistent://my-property/my-ns/testProducerSequenceAfterRestart";
+        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic)
                 .producerName("my-producer-name");
@@ -108,8 +108,8 @@ public void testProducerSequenceAfterRestart() throws Exception {
 
     @Test(timeOut = 30000)
     public void testProducerDeduplication() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/testProducerDeduplication";
-        admin.namespaces().setDeduplicationStatus("my-property/use/my-ns", true);
+        String topic = "persistent://my-property/my-ns/testProducerDeduplication";
+        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
 
         // Set infinite timeout
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 696e21fbd..e0c46acfa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -101,7 +101,7 @@ public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti
             final int unackMsgAllowed = 100;
             final int receiverQueueSize = 10;
             final int totalProducedMsgs = 200;
-            final String topicName = "persistent://my-property/use/my-ns/unacked-topic";
+            final String topicName = "persistent://my-property/my-ns/unacked-topic";
             final String subscriberName = "subscriber-1";
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
@@ -114,7 +114,7 @@ public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti
             List<Consumer<?>> consumers = Lists.newArrayList(consumer1, consumer2, consumer3);
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -207,7 +207,7 @@ public void testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Except
             final int unackMsgAllowed = 100;
             final int totalProducedMsgs = 200;
             final int receiverQueueSize = 10;
-            final String topicName = "persistent://my-property/use/my-ns/unacked-topic";
+            final String topicName = "persistent://my-property/my-ns/unacked-topic";
             final String subscriberName = "subscriber-1";
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
@@ -220,7 +220,7 @@ public void testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Except
             List<ConsumerImpl<byte[]>> consumers = Lists.newArrayList(consumer1, consumer2, consumer3);
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -307,7 +307,7 @@ public void testCloseConsumerBlockedDispatcher() throws Exception {
             final int unackMsgAllowed = 100;
             final int receiverQueueSize = 10;
             final int totalProducedMsgs = 200;
-            final String topicName = "persistent://my-property/use/my-ns/unacked-topic";
+            final String topicName = "persistent://my-property/my-ns/unacked-topic";
             final String subscriberName = "subscriber-1";
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
@@ -315,7 +315,7 @@ public void testCloseConsumerBlockedDispatcher() throws Exception {
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -393,7 +393,7 @@ public void testRedeliveryOnBlockedDistpatcher() throws Exception {
             final int unackMsgAllowed = 100;
             final int receiverQueueSize = 10;
             final int totalProducedMsgs = 200;
-            final String topicName = "persistent://my-property/use/my-ns/unacked-topic";
+            final String topicName = "persistent://my-property/my-ns/unacked-topic";
             final String subscriberName = "subscriber-1";
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
@@ -406,7 +406,7 @@ public void testRedeliveryOnBlockedDistpatcher() throws Exception {
             List<ConsumerImpl<?>> consumers = Lists.newArrayList(consumer1, consumer2, consumer3);
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -592,14 +592,14 @@ public void testBlockDispatcherStats() throws Exception {
     public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String topicName = "persistent://my-property/use/my-ns/unacked-topic";
+        final String topicName = "persistent://my-property/my-ns/unacked-topic";
         final String subscriberName = "subscriber-1";
         final int totalProducedMsgs = 500;
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriberName)
                 .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
                 .create();
 
         CountDownLatch latch = new CountDownLatch(totalProducedMsgs);
@@ -694,7 +694,7 @@ public void testBlockBrokerDispatching() throws Exception {
 
             final int receiverQueueSize = 10;
             final int totalProducedMsgs = maxUnAckPerBroker * 3;
-            final String topicName = "persistent://my-property/use/my-ns/unacked-topic";
+            final String topicName = "persistent://my-property/my-ns/unacked-topic";
             final String subscriberName1 = "subscriber-1";
             final String subscriberName2 = "subscriber-2";
             final String subscriberName3 = "subscriber-3";
@@ -713,7 +713,7 @@ public void testBlockBrokerDispatching() throws Exception {
             consumer1Sub3.close();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // continuously checks unack-message dispatching
             ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
@@ -894,7 +894,7 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {
 
             final int receiverQueueSize = 10;
             final int totalProducedMsgs = maxUnAckPerBroker * 3;
-            final String topicName = "persistent://my-property/use/my-ns/unacked-topic";
+            final String topicName = "persistent://my-property/my-ns/unacked-topic";
             final String subscriberName1 = "subscriber-1";
             final String subscriberName2 = "subscriber-2";
 
@@ -913,7 +913,7 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {
                     TimeUnit.MILLISECONDS);
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 7bd6997ac..0c2a37ed4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -49,7 +49,7 @@
     protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
-        this.conf.setClusterName("use");
+        this.conf.setClusterName("test");
     }
 
     @AfterMethod
@@ -102,10 +102,10 @@ public void testMessageRateDynamicallyChange() throws Exception {
 
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingBlock";
 
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -163,7 +163,7 @@ public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscr
             DispatchRateType dispatchRateType) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingBlock";
 
         final int messageRate = 100;
@@ -174,7 +174,7 @@ public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscr
             dispatchRate = new DispatchRate(-1, messageRate, 360);
         }
 
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -233,7 +233,7 @@ public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscr
     public void testClusterMsgByteRateLimitingClusterConfig() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingBlock";
         final int messageRate = 5;
         final long byteRate = 1024 * 1024;// 1MB rate enough to let all msg to be delivered
@@ -251,7 +251,7 @@ public void testClusterMsgByteRateLimitingClusterConfig() throws Exception {
         }
         Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), initValue);
 
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -305,12 +305,12 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
             throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingAll";
 
         final int messageRate = 10;
         DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 1);
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -376,12 +376,12 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
     public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingAll";
 
         final int byteRate = 100;
         DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1);
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -439,12 +439,12 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT
     public void testRateLimitingMultipleConsumers() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingMultipleConsumers";
 
         final int messageRate = 5;
         DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 360);
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -509,7 +509,7 @@ public void testRateLimitingMultipleConsumers() throws Exception {
     public void testClusterRateLimitingConfiguration(SubscriptionType subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingBlock";
         final int messageRate = 5;
 
@@ -525,7 +525,7 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription)
         }
         Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), initValue);
 
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -571,13 +571,13 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription)
     public void testMessageByteRateThrottlingCombined(SubscriptionType subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingAll";
 
         final int messageRate = 5; // 5 msgs per second
         final long byteRate = 10; // 10 bytes per second
         DispatchRate dispatchRate = new DispatchRate(messageRate, byteRate, 360);
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -645,7 +645,7 @@ public void testMessageByteRateThrottlingCombined(SubscriptionType subscription)
     public void testGlobalNamespaceThrottling() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/global/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingBlock";
 
         final int messageRate = 5;
@@ -653,7 +653,7 @@ public void testGlobalNamespaceThrottling() throws Exception {
 
         admin.clusters().createCluster("global", new ClusterData("http://global:8080"));
         admin.namespaces().createNamespace(namespace);
-        admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("use"));
+        admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
 
         // create producer and topic
@@ -715,13 +715,13 @@ public void testGlobalNamespaceThrottling() throws Exception {
     public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingBlock";
 
         final int messageRate = 10;
         DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 360);
 
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         admin.brokers().updateDynamicConfiguration("dispatchThrottlingOnNonBacklogConsumerEnabled",
                 Boolean.TRUE.toString());
@@ -790,7 +790,7 @@ public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscri
     public void testClusterPolicyOverrideConfiguration() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName1 = "persistent://" + namespace + "/throttlingOverride1";
         final String topicName2 = "persistent://" + namespace + "/throttlingOverride2";
         final int clusterMessageRate = 100;
@@ -807,7 +807,7 @@ public void testClusterPolicyOverrideConfiguration() throws Exception {
         }
         Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), initValue);
 
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName1).create();
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName1).get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index d18c3d2d7..52deb52f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -103,7 +103,7 @@ protected void cleanup() throws Exception {
     public void testNonPersistentTopic(SubscriptionType type) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String topic = "non-persistent://my-property/use/my-ns/unacked-topic";
+        final String topic = "non-persistent://my-property/my-ns/unacked-topic";
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("subscriber-1").subscriptionType(type).subscribe();
 
@@ -142,7 +142,7 @@ public void testNonPersistentTopic(SubscriptionType type) throws Exception {
     public void testPartitionedNonPersistentTopic(SubscriptionType type) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String topic = "non-persistent://my-property/use/my-ns/partitioned-topic";
+        final String topic = "non-persistent://my-property/my-ns/partitioned-topic";
         admin.nonPersistentTopics().createPartitionedTopic(topic, 5);
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1")
                 .subscriptionType(type).subscribe();
@@ -183,7 +183,7 @@ public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type
         log.info("-- Starting {} test --", methodName);
 
         final int numPartitions = 5;
-        final String topic = "non-persistent://my-property/use/my-ns/partitioned-topic";
+        final String topic = "non-persistent://my-property/my-ns/partitioned-topic";
         admin.nonPersistentTopics().createPartitionedTopic(topic, numPartitions);
 
         PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + BROKER_PORT)
@@ -235,7 +235,7 @@ public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type
     public void testConsumerInternalQueueMaxOut(SubscriptionType type) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String topic = "non-persistent://my-property/use/my-ns/unacked-topic";
+        final String topic = "non-persistent://my-property/my-ns/unacked-topic";
         final int queueSize = 10;
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic)
                 .receiverQueueSize(queueSize).subscriptionName("subscriber-1").subscriptionType(type).subscribe();
@@ -278,7 +278,7 @@ public void testConsumerInternalQueueMaxOut(SubscriptionType type) throws Except
     public void testProducerRateLimit() throws Exception {
         int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection();
         try {
-            final String topic = "non-persistent://my-property/use/my-ns/unacked-topic";
+            final String topic = "non-persistent://my-property/my-ns/unacked-topic";
             // restart broker with lower publish rate limit
             conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
             stopBroker();
@@ -337,7 +337,7 @@ public void testProducerRateLimit() throws Exception {
     public void testMultipleSubscription() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String topic = "non-persistent://my-property/use/my-ns/unacked-topic";
+        final String topic = "non-persistent://my-property/my-ns/unacked-topic";
         ConsumerImpl<byte[]> consumer1Shared = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("subscriber-shared").subscriptionType(SubscriptionType.Shared).subscribe();
 
@@ -415,7 +415,7 @@ public void testMultipleSubscription() throws Exception {
     @Test
     public void testTopicStats() throws Exception {
 
-        final String topicName = "non-persistent://my-property/use/my-ns/unacked-topic";
+        final String topicName = "non-persistent://my-property/my-ns/unacked-topic";
         final String subName = "non-persistent";
         final int timeWaitToSync = 100;
 
@@ -620,7 +620,7 @@ public void testReplicator() throws Exception {
     @Test(dataProvider = "loadManager")
     public void testLoadManagerAssignmentForNonPersistentTestAssignment(String loadManagerName) throws Exception {
 
-        final String namespace = "my-property/use/my-ns";
+        final String namespace = "my-property/my-ns";
         final String topicName = "non-persistent://" + namespace + "/loadManager";
         final String defaultLoadManagerName = conf.getLoadManagerClassName();
         final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics();
@@ -675,7 +675,7 @@ public void testLoadManagerAssignmentForNonPersistentTestAssignment(String loadM
     @Test
     public void testNonPersistentTopicUnderPersistentNamespace() throws Exception {
 
-        final String namespace = "my-property/use/my-ns";
+        final String namespace = "my-property/my-ns";
         final String topicName = "non-persistent://" + namespace + "/persitentNamespace";
 
         final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics();
@@ -707,7 +707,7 @@ public void testNonPersistentTopicUnderPersistentNamespace() throws Exception {
     @Test(dataProvider = "loadManager")
     public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerName) throws Exception {
 
-        final String namespace = "my-property/use/my-ns";
+        final String namespace = "my-property/my-ns";
         final String topicName = "persistent://" + namespace + "/loadManager";
         final String defaultLoadManagerName = conf.getLoadManagerClassName();
         final boolean defaultEnablePersistentTopic = conf.isEnablePersistentTopics();
@@ -767,7 +767,7 @@ public void testMsgDropStat() throws Exception {
 
         int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection();
         try {
-            final String topicName = "non-persistent://my-property/use/my-ns/stats-topic";
+            final String topicName = "non-persistent://my-property/my-ns/stats-topic";
             // restart broker with lower publish rate limit
             conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
             stopBroker();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 0708229e2..583026585 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -68,7 +68,7 @@ public void testRoundRobinProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/my-partitionedtopic1");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1");
 
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
 
@@ -107,9 +107,9 @@ public void testPartitionedTopicNameWithSpecialCharacter() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int numPartitions = 4;
-        final String specialCharacter = "! * ' ( ) ; : @ & = + $ , /\\ ? % # [ ]";
+        final String specialCharacter = "! * ' ( ) ; : @ & = + $ , \\ ? % # [ ]";
         TopicName topicName = TopicName
-                .get("persistent://my-property/use/my-ns/my-partitionedtopic1" + specialCharacter);
+                .get("persistent://my-property/my-ns/my-partitionedtopic1" + specialCharacter);
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
 
         // Try to create producer which does lookup and create connection with broker
@@ -125,7 +125,7 @@ public void testSinglePartitionProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/my-partitionedtopic2");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic2");
 
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
 
@@ -166,7 +166,7 @@ public void testKeyBasedProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/my-partitionedtopic3");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic3");
         String dummyKey1 = "dummykey1";
         String dummyKey2 = "dummykey2";
 
@@ -221,7 +221,7 @@ public void testInvalidSequence() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/my-partitionedtopic4");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic4");
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString())
@@ -268,7 +268,7 @@ public void testInvalidSequence() throws Exception {
     public void testSillyUser() throws Exception {
 
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/my-partitionedtopic5");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic5");
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
 
         Producer<byte[]> producer = null;
@@ -312,7 +312,7 @@ public void testSillyUser() throws Exception {
     @Test(timeOut = 30000)
     public void testDeletePartitionedTopic() throws Exception {
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/my-partitionedtopic6");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic6");
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()).create();
@@ -339,7 +339,7 @@ public void testAsyncPartitionedProducerConsumer() throws Exception {
         final Set<String> consumeMsgs = Sets.newHashSet();
 
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/my-partitionedtopic1");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1");
 
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString())
@@ -386,7 +386,7 @@ public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception
         final Set<String> consumeMsgs = Sets.newHashSet();
 
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/my-partitionedtopic1");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1");
 
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
 
@@ -435,7 +435,7 @@ public void testFairDistributionForPartitionConsumers() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         final int numPartitions = 2;
-        final String topicName = "persistent://my-property/use/my-ns/my-topic";
+        final String topicName = "persistent://my-property/my-ns/my-topic";
         final String producer1Msg = "producer1";
         final String producer2Msg = "producer2";
         final int queueSize = 10;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index 2960bb3df..aad1234fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import com.google.common.collect.Sets;
+
 import java.lang.reflect.Method;
 import java.util.Set;
 
@@ -27,8 +29,6 @@
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 
-import com.google.common.collect.Sets;
-
 public abstract class ProducerConsumerBase extends MockedPulsarServiceBaseTest {
     protected String methodName;
 
@@ -38,10 +38,11 @@ public void beforeMethod(Method m) throws Exception {
     }
 
     public void producerBaseSetup() throws Exception {
-        admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT));
+        admin.clusters().createCluster("test", new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT));
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("test"));
     }
 
     protected void testMessageOrderAndDuplicates(Set<String> messagesReceived, String receivedMessage,
@@ -53,5 +54,5 @@ protected void testMessageOrderAndDuplicates(Set<String> messagesReceived, Strin
         // Make sure that there are no duplicates
         Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
     }
-    
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index b7c2fd4b0..b5dcd87dd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -94,11 +94,11 @@ protected void cleanup() throws Exception {
     public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic1");
+                .topic("persistent://my-property/my-ns/my-topic1");
 
         if (batchMessageDelayMs != 0) {
             producerBuilder.enableBatching(true);
@@ -130,11 +130,11 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exceptio
     @Test(dataProvider = "batch")
     public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic2")
                 .subscriptionName("my-subscriber-name").subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic2");
+                .topic("persistent://my-property/my-ns/my-topic2");
 
         if (batchMessageDelayMs != 0) {
             producerBuilder.enableBatching(true);
@@ -181,7 +181,7 @@ public void testMessageListener(int batchMessageDelayMs) throws Exception {
         int numMessages = 100;
         final CountDownLatch latch = new CountDownLatch(numMessages);
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic3")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic3")
                 .subscriptionName("my-subscriber-name").messageListener((c1, msg) -> {
                     Assert.assertNotNull(msg, "Message cannot be null");
                     String receivedMessage = new String(msg.getData());
@@ -191,7 +191,7 @@ public void testMessageListener(int batchMessageDelayMs) throws Exception {
                 }).subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic3");
+                .topic("persistent://my-property/my-ns/my-topic3");
 
         if (batchMessageDelayMs != 0) {
             producerBuilder.enableBatching(true);
@@ -223,10 +223,10 @@ public void testMessageListener(int batchMessageDelayMs) throws Exception {
     public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
         // Create consumer and producer
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic4")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic4")
                 .subscriptionName("my-subscriber-name").subscribe();
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic4");
+                .topic("persistent://my-property/my-ns/my-topic4");
 
         if (batchMessageDelayMs != 0) {
             producerBuilder.enableBatching(true);
@@ -272,10 +272,10 @@ public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {
     public void testSendTimeout(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic5")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic5")
                 .subscriptionName("my-subscriber-name").subscribe();
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
+                .topic("persistent://my-property/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
 
         if (batchMessageDelayMs != 0) {
             producerBuilder.enableBatching(true);
@@ -314,7 +314,7 @@ public void testInvalidSequence() throws Exception {
         client1.close();
 
         try {
-            client1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic6")
+            client1.newConsumer().topic("persistent://my-property/my-ns/my-topic6")
                     .subscriptionName("my-subscriber-name").subscribe();
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
@@ -322,13 +322,13 @@ public void testInvalidSequence() throws Exception {
         }
 
         try {
-            client1.newProducer().topic("persistent://my-property/use/my-ns/my-topic6").create();
+            client1.newProducer().topic("persistent://my-property/my-ns/my-topic6").create();
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
             Assert.assertTrue(e instanceof PulsarClientException.AlreadyClosedException);
         }
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic6")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic6")
                 .subscriptionName("my-subscriber-name").subscribe();
 
         try {
@@ -354,7 +354,7 @@ public void testInvalidSequence() throws Exception {
             // ok
         }
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic6")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic6")
                 .create();
         producer.close();
 
@@ -419,7 +419,7 @@ public void testSillyUser() {
         }
 
         try {
-            pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic7").subscriptionName(null)
+            pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic7").subscriptionName(null)
                     .subscribe();
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
@@ -427,7 +427,7 @@ public void testSillyUser() {
         }
 
         try {
-            pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic7").subscriptionName("")
+            pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic7").subscriptionName("")
                     .subscribe();
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
@@ -452,7 +452,7 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs)
 
         String subName = UUID.randomUUID().toString();
         final Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic7").subscriptionName(subName)
+                .topic("persistent://my-property/my-ns/my-topic7").subscriptionName(subName)
                 .receiverQueueSize(recvQueueSize).subscribe();
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -478,7 +478,7 @@ public Void call() throws Exception {
 
         // publish 100 messages so that the consumers blocked on receive() will now get the messages
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic7");
+                .topic("persistent://my-property/my-ns/my-topic7");
 
         if (batchMessageDelayMs != 0) {
             producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
@@ -559,7 +559,7 @@ public void testSendBigMessageSize() throws Exception {
         MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize]).build();
 
         try {
-            final String topic = "persistent://my-property/use/my-ns/bigMsg";
+            final String topic = "persistent://my-property/my-ns/bigMsg";
             Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
             Message<byte[]> message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1])
                     .build();
@@ -588,7 +588,7 @@ public void testSendBigMessageSize() throws Exception {
     public void testSendBigMessageSizeButCompressed() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String topic = "persistent://my-property/use/my-ns/bigMsg";
+        final String topic = "persistent://my-property/my-ns/bigMsg";
 
         // (a) non-batch msg with compression
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4)
@@ -657,9 +657,9 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
         /************ usecase-1: *************/
         // 1. Subscriber Faster subscriber
         Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName).subscriptionName(sub1)
+                .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub1)
                 .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
-        final String topic = "persistent://my-property/use/my-ns/" + topicName;
+        final String topic = "persistent://my-property/my-ns/" + topicName;
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic);
 
         if (batchMessageDelayMs != 0) {
@@ -708,7 +708,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
         /************ usecase-2: *************/
         // 1.b Subscriber slower-subscriber
         Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName).subscriptionName(sub2).subscribe();
+                .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub2).subscribe();
         // Produce messages
         final int moreMessages = 10;
         for (int i = 0; i < receiverSize + moreMessages; i++) {
@@ -751,17 +751,17 @@ public void testDeactivatingBacklogConsumer() throws Exception {
         final long batchMessageDelayMs = 100;
         final int receiverSize = 10;
         final String topicName = "cache-topic";
-        final String topic = "persistent://my-property/use/my-ns/" + topicName;
+        final String topic = "persistent://my-property/my-ns/" + topicName;
         final String sub1 = "faster-sub1";
         final String sub2 = "slower-sub2";
 
         // 1. Subscriber Faster subscriber: let it consume all messages immediately
         Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName).subscriptionName(sub1)
+                .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub1)
                 .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
         // 1.b. Subscriber Slow subscriber:
         Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName).subscriptionName(sub2)
+                .topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub2)
                 .subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic);
@@ -835,11 +835,11 @@ public void testAsyncProducerAndConsumer() throws Exception {
         final Set<String> produceMsgs = Sets.newHashSet();
         final Set<String> consumeMsgs = Sets.newHashSet();
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
 
         // produce message
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
                 .create();
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
@@ -873,12 +873,12 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception {
         final int totalMsg = 100;
         final Set<String> produceMsgs = Sets.newHashSet();
         final Set<String> consumeMsgs = Sets.newHashSet();
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
         ;
 
         // produce message
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
                 .create();
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
@@ -910,7 +910,7 @@ public void testSendCallBack() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         final int totalMsg = 100;
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
                 .create();
         for (int i = 0; i < totalMsg; i++) {
             final String message = "my-message-" + i;
@@ -940,13 +940,13 @@ public void testSharedConsumerAckDifferentConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name")
+                .topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name")
                 .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
                 .acknowledmentGroupTime(0, TimeUnit.SECONDS);
         Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
         Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -1044,11 +1044,11 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
             Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1125,12 +1125,12 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
             final int totalReceiveIteration = totalProducedMsgs / unAckedMessagesBufferSize;
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
             Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared)
                     .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1199,12 +1199,12 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
             ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared);
             Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
             Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1289,12 +1289,12 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio
             final int totalProducedMsgs = 100;
 
             ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).ackTimeout(1, TimeUnit.SECONDS)
                     .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1346,11 +1346,11 @@ public void testUnackBlockRedeliverMessages() throws Exception {
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
             ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1413,11 +1413,11 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
             Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             ProducerBuilder<byte[]> producerBuidler = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic");
+                    .topic("persistent://my-property/my-ns/unacked-topic");
 
             if (batchMessageDelayMs != 0) {
                 producerBuidler.enableBatching(true);
@@ -1506,13 +1506,13 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
             ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared);
             Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
             Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1586,11 +1586,11 @@ public void testEnabledChecksumClient() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         final int totalMsg = 10;
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/my-topic1");
+                .topic("persistent://my-property/my-ns/my-topic1");
         final int batchMessageDelayMs = 300;
         if (batchMessageDelayMs != 0) {
             producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
@@ -1638,11 +1638,11 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause()
 
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
             ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1721,12 +1721,12 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
             pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
             // Only subscribe consumer
             ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
             consumer.close();
 
             Producer<byte[]> producer = pulsarClient.newProducer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").create();
+                    .topic("persistent://my-property/my-ns/unacked-topic").create();
 
             // (1) Produced Messages
             for (int i = 0; i < totalProducedMsgs; i++) {
@@ -1737,7 +1737,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
 
             // (1.a) start consumer again
             consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                    .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                    .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                     .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();
 
             // (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
@@ -1792,14 +1792,14 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
     public void testPriorityConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
         ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
                 .subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).priorityLevel(1);
 
         Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
         Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
         Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
         Consumer<byte[]> consumer4 = consumerBuilder.clone().priorityLevel(2).subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
                 .create();
         List<Future<MessageId>> futures = Lists.newArrayList();
 
@@ -1862,12 +1862,12 @@ public void testSharedSamePriorityConsumer() throws Exception {
         pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(queueSize);
 
         ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic2").subscriptionName("my-subscriber-name")
+                .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
                 .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
                 .acknowledmentGroupTime(0, TimeUnit.SECONDS);
         Consumer<byte[]> c1 = consumerBuilder.subscribe();
         Consumer<byte[]> c2 = consumerBuilder.subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
                 .create();
         List<Future<MessageId>> futures = Lists.newArrayList();
 
@@ -1963,11 +1963,11 @@ public void testRedeliveryFailOverConsumer() throws Exception {
 
         // Only subscribe consumer
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/unacked-topic").subscriptionName("subscriber-1")
+                .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
                 .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover)
                 .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
                 .create();
 
         // (1) First round to produce-consume messages
@@ -2041,7 +2041,7 @@ public void testFailReceiveAsyncOnConsumerClose() throws Exception {
 
         // (1) simple consumers
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/failAsyncReceive").subscriptionName("my-subscriber-name")
+                .topic("persistent://my-property/my-ns/failAsyncReceive").subscriptionName("my-subscriber-name")
                 .subscribe();
         consumer.close();
         // receive messages
@@ -2054,7 +2054,7 @@ public void testFailReceiveAsyncOnConsumerClose() throws Exception {
 
         // (2) Partitioned-consumer
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/failAsyncReceive");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/failAsyncReceive");
         admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions);
         Consumer<byte[]> partitionedConsumer = pulsarClient.newConsumer().topic(topicName.toString())
                 .subscriptionName("my-partitioned-subscriber").subscribe();
@@ -2116,11 +2116,11 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         Set<String> messageSet = Sets.newHashSet();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/myecdsa-topic1").subscriptionName("my-subscriber-name")
+                .topic("persistent://my-property/my-ns/myecdsa-topic1").subscriptionName("my-subscriber-name")
                 .cryptoKeyReader(new EncKeyReader()).subscribe();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/myecdsa-topic1").addEncryptionKey("client-ecdsa.pem")
+                .topic("persistent://my-property/my-ns/myecdsa-topic1").addEncryptionKey("client-ecdsa.pem")
                 .cryptoKeyReader(new EncKeyReader()).create();
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
@@ -2186,12 +2186,12 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         final int totalMsg = 10;
 
         Set<String> messageSet = Sets.newHashSet();
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/myrsa-topic1")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
                 .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myrsa-topic1")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
                 .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
-        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myrsa-topic1")
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
                 .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
 
         for (int i = 0; i < totalMsg; i++) {
@@ -2341,7 +2341,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
     @Test
     public void testConsumerSubscriptionInitialize() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        String topicName = "persistent://my-property/use/my-ns/test-subscription-initialize-topic";
+        String topicName = "persistent://my-property/my-ns/test-subscription-initialize-topic";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topicName)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 223a40b49..b4ce2fcf5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.client.api;
 
+import com.google.common.collect.Sets;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -46,7 +48,7 @@ public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscr
                                                              DispatchRateType dispatchRateType) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingBlock";
         final String subName = "my-subscriber-name";
 
@@ -58,7 +60,7 @@ public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscr
             dispatchRate = new DispatchRate(-1, messageRate, 360);
         }
 
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
         // create producer, topic and consumer
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
@@ -134,13 +136,13 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
         throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingAll";
         final String subName = "my-subscriber-name";
 
         final int messageRate = 10;
         DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 1);
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
         final int numProducedMessages = 30;
         final CountDownLatch latch = new CountDownLatch(numProducedMessages);
@@ -219,13 +221,13 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
     public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingAll";
         final String subName = "my-subscriber-name";
 
         final int byteRate = 100;
         DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1);
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
         final int numProducedMessages = 30;
         final CountDownLatch latch = new CountDownLatch(numProducedMessages);
@@ -299,13 +301,13 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT
     public void testRateLimitingMultipleConsumers() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingMultipleConsumers";
         final String subName = "my-subscriber-name";
 
         final int messageRate = 5;
         DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 360);
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
 
         final int numProducedMessages = 500;
@@ -381,7 +383,7 @@ public void testRateLimitingMultipleConsumers() throws Exception {
     public void testClusterRateLimitingConfiguration(SubscriptionType subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingBlock";
         final String subName = "my-subscriber-name";
 
@@ -401,7 +403,7 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription)
         }
         Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), initValue);
 
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -452,7 +454,7 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription)
     public void testClusterPolicyOverrideConfiguration() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String namespace = "my-property/use/throttling_ns";
+        final String namespace = "my-property/throttling_ns";
         final String topicName1 = "persistent://" + namespace + "/throttlingOverride1";
         final String topicName2 = "persistent://" + namespace + "/throttlingOverride2";
         final String subName1 = "my-subscriber-name1";
@@ -474,7 +476,7 @@ public void testClusterPolicyOverrideConfiguration() throws Exception {
         }
         Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), initValue);
 
-        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
 
         // create producer and topic
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName1).create();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
index 0c850216e..3c06dc609 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
@@ -92,6 +92,6 @@ protected void internalSetUpForNamespace() throws Exception {
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+        admin.namespaces().createNamespace("my-property/my-ns");
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 9891d0bbf..3097a93bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -60,10 +60,10 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testSimpleReader() throws Exception {
-        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReader")
+        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
                 .startMessageId(MessageId.earliest).create();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReader")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReader")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -88,14 +88,14 @@ public void testSimpleReader() throws Exception {
 
     @Test
     public void testReaderAfterMessagesWerePublished() throws Exception {
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
+        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished")
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
@@ -116,17 +116,17 @@ public void testReaderAfterMessagesWerePublished() throws Exception {
 
     @Test
     public void testMultipleReaders() throws Exception {
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testMultipleReaders")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultipleReaders")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Reader<byte[]> reader1 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders")
+        Reader<byte[]> reader1 = pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders")
                 .startMessageId(MessageId.earliest).create();
 
-        Reader<byte[]> reader2 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders")
+        Reader<byte[]> reader2 = pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders")
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
@@ -157,7 +157,7 @@ public void testMultipleReaders() throws Exception {
 
     @Test
     public void testTopicStats() throws Exception {
-        String topicName = "persistent://my-property/use/my-ns/testTopicStats";
+        String topicName = "persistent://my-property/my-ns/testTopicStats";
 
         Reader<byte[]> reader1 = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
 
@@ -178,14 +178,14 @@ public void testTopicStats() throws Exception {
 
     @Test
     public void testReaderOnLastMessage() throws Exception {
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnLastMessage")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage")
+        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnLastMessage")
                 .startMessageId(MessageId.latest).create();
 
         for (int i = 10; i < 20; i++) {
@@ -213,7 +213,7 @@ public void testReaderOnLastMessage() throws Exception {
 
     @Test
     public void testReaderOnSpecificMessage() throws Exception {
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage")
                 .create();
         List<MessageId> messageIds = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
@@ -221,7 +221,7 @@ public void testReaderOnSpecificMessage() throws Exception {
             messageIds.add(producer.send(message.getBytes()));
         }
 
-        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage")
+        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage")
                 .startMessageId(messageIds.get(4)).create();
 
         // Publish more messages and verify the readers only sees messages starting from the intended message
@@ -244,7 +244,7 @@ public void testReaderOnSpecificMessage() throws Exception {
     @Test
     public void testReaderOnSpecificMessageWithBatches() throws Exception {
         Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessageWithBatches").enableBatching(true)
+                .topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").enableBatching(true)
                 .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -254,7 +254,7 @@ public void testReaderOnSpecificMessageWithBatches() throws Exception {
         // Write one sync message to ensure everything before got persistend
         producer.send("my-message-10".getBytes());
         Reader<byte[]> reader1 = pulsarClient.newReader()
-                .topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessageWithBatches")
+                .topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches")
                 .startMessageId(MessageId.earliest).create();
 
         MessageId lastMessageId = null;
@@ -268,7 +268,7 @@ public void testReaderOnSpecificMessageWithBatches() throws Exception {
         System.out.println("CREATING READER ON MSG ID: " + lastMessageId);
 
         Reader<byte[]> reader2 = pulsarClient.newReader()
-                .topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessageWithBatches")
+                .topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches")
                 .startMessageId(lastMessageId).create();
 
         for (int i = 5; i < 11; i++) {
@@ -328,11 +328,11 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
 
         Set<String> messageSet = Sets.newHashSet();
         Reader<byte[]> reader = pulsarClient.newReader()
-                .topic("persistent://my-property/use/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest)
+                .topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest)
                 .cryptoKeyReader(new EncKeyReader()).create();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/test-reader-myecdsa-topic1")
+                .topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1")
                 .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new EncKeyReader()).create();
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
@@ -355,9 +355,9 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
 
     @Test
     public void testSimpleReaderReachEndOfTopic() throws Exception {
-        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic")
+        Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic")
                 .startMessageId(MessageId.earliest).create();
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic")
                 .create();
 
         // no data write, should return false
@@ -411,11 +411,11 @@ public void testSimpleReaderReachEndOfTopic() throws Exception {
     @Test
     public void testReaderReachEndOfTopicOnMessageWithBatches() throws Exception {
         Reader<byte[]> reader = pulsarClient.newReader()
-                .topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
+                .topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
                 .startMessageId(MessageId.earliest).create();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
+                .topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
                 .enableBatching(true).batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).create();
 
         // no data write, should return false
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerBase.java
new file mode 100644
index 000000000..d6fb9ef1d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerBase.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v1;
+
+import com.google.common.collect.Sets;
+
+import java.lang.reflect.Method;
+import java.util.Set;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+
+public abstract class V1_ProducerConsumerBase extends MockedPulsarServiceBaseTest {
+    protected String methodName;
+
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+    }
+
+    public void producerBaseSetup() throws Exception {
+        admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT));
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+    }
+
+    protected void testMessageOrderAndDuplicates(Set<String> messagesReceived, String receivedMessage,
+            String expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 74491d7a4..80ac52068 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -28,6 +28,9 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
@@ -63,7 +66,6 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -80,14 +82,11 @@
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 /**
  * Basic tests using the deprecated client APIs from Pulsar-1.x
  */
 @SuppressWarnings({ "deprecation", "rawtypes", "unchecked" })
-public class V1_ProducerConsumerTest extends ProducerConsumerBase {
+public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(V1_ProducerConsumerTest.class);
 
     @BeforeMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 584fc86da..2156bb38f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -119,10 +119,10 @@ protected void cleanup() throws Exception {
     @Test
     public void testDisconnectClientWithoutClosingConnection() throws Exception {
 
-        final String ns1 = "my-property/use/con-ns1";
-        final String ns2 = "my-property/use/con-ns2";
-        admin.namespaces().createNamespace(ns1);
-        admin.namespaces().createNamespace(ns2);
+        final String ns1 = "my-property/con-ns1";
+        final String ns2 = "my-property/con-ns2";
+        admin.namespaces().createNamespace(ns1, Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(ns2, Sets.newHashSet("test"));
 
         final String topic1 = "persistent://" + ns1 + "/my-topic";
         final String topic2 = "persistent://" + ns2 + "/my-topic";
@@ -239,10 +239,10 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception {
     @Test
     public void testCloseBrokerService() throws Exception {
 
-        final String ns1 = "my-property/use/brok-ns1";
-        final String ns2 = "my-property/use/brok-ns2";
-        admin.namespaces().createNamespace(ns1);
-        admin.namespaces().createNamespace(ns2);
+        final String ns1 = "my-property/brok-ns1";
+        final String ns2 = "my-property/brok-ns2";
+        admin.namespaces().createNamespace(ns1, Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(ns2, Sets.newHashSet("test"));
 
         final String topic1 = "persistent://" + ns1 + "/my-topic";
         final String topic2 = "persistent://" + ns2 + "/my-topic";
@@ -293,7 +293,7 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
         log.info("-- Starting {} test --", methodName);
 
         final int batchMessageDelayMs = 1000;
-        final String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
         final String subscriptionName = "my-subscriber-name" + subType;
 
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
@@ -367,7 +367,7 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
     @Test(timeOut = 10000, dataProvider = "subType")
     public void testResetCursor(SubscriptionType subType) throws Exception {
         final RetentionPolicies policy = new RetentionPolicies(60, 52 * 1024);
-        final TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/unacked-topic");
+        final TopicName topicName = TopicName.get("persistent://my-property/my-ns/unacked-topic");
         final int warmup = 20;
         final int testSize = 150;
         final List<Message<byte[]>> received = new ArrayList<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
index af5e80401..85dbb245b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -23,16 +23,12 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
-import java.io.IOException;
-
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
-
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -53,9 +49,7 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testCompactedOutMessages() throws Exception {
-        final String ns1 = "my-property/use/con-ns1";
-        admin.namespaces().createNamespace(ns1);
-        final String topic1 = "persistent://" + ns1 + "/my-topic";
+        final String topic1 = "persistent://my-property/my-ns/my-topic";
 
         MessageMetadata metadata = MessageMetadata.newBuilder().setProducerName("foobar")
             .setSequenceId(1).setPublishTime(1).setNumMessagesInBatch(3).build();
@@ -82,7 +76,7 @@ public void testCompactedOutMessages() throws Exception {
             consumer.receiveIndividualMessagesFromBatch(metadata, batchBuffer,
                                                         MessageIdData.newBuilder().setLedgerId(1234)
                                                         .setEntryId(567).build(), consumer.cnx());
-            Message m = consumer.receive();
+            Message<?> m = consumer.receive();
             assertEquals(((BatchMessageIdImpl)m.getMessageId()).getLedgerId(), 1234);
             assertEquals(((BatchMessageIdImpl)m.getMessageId()).getEntryId(), 567);
             assertEquals(((BatchMessageIdImpl)m.getMessageId()).getBatchIndex(), 2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
index a71fc2019..23bf60a4c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
@@ -43,7 +43,7 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testClosedConsumer() throws PulsarClientException {
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop/cluster/ns/topicName")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/topicName")
                 .subscriptionName("my-subscription").subscribe();
         consumer.close();
         Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally());
@@ -61,7 +61,7 @@ public void testClosedConsumer() throws PulsarClientException {
     @Test
     public void testListener() throws PulsarClientException {
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop/cluster/ns/topicName")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/topicName")
                 .subscriptionName("my-subscription").messageListener((consumer1, msg) -> {
 
                 }).subscribe();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index b3e74f7b9..9624e8e54 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -57,6 +57,7 @@ public void setup() throws Exception {
         // set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
         isTcpLookup = true;
         super.internalSetup();
+        super.producerBaseSetup();
     }
 
     @Override
@@ -69,11 +70,11 @@ public void testPatternTopicsSubscribeWithBuilderFail() throws Exception {
         String key = "PatternTopicsSubscribeWithBuilderFail";
         final String subscriptionName = "my-ex-subscription-" + key;
 
-        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
-        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
-        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        final String topicName1 = "persistent://my-property/my-ns/topic-1-" + key;
+        final String topicName2 = "persistent://my-property/my-ns/topic-2-" + key;
+        final String topicName3 = "persistent://my-property/my-ns/topic-3-" + key;
         List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
-        final String patternString = "persistent://prop/use/ns-abc/pattern-topic.*";
+        final String patternString = "persistent://my-property/my-ns/pattern-topic.*";
         Pattern pattern = Pattern.compile(patternString);
 
         admin.properties().createProperty("prop", new PropertyAdmin());
@@ -128,10 +129,10 @@ public void testPatternTopicsSubscribeWithBuilderFail() throws Exception {
     public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
         String key = "BinaryProtoToGetTopics";
         String subscriptionName = "my-ex-subscription-" + key;
-        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
-        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
-        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
-        Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
+        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
+        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
+        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
 
         // 1. create partition
         admin.properties().createProperty("prop", new PropertyAdmin());
@@ -205,17 +206,17 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
 
     @Test(timeOut = testTimeout)
     public void testTopicsPatternFilter() throws Exception {
-        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1";
-        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2";
-        String topicName3 = "persistent://prop/use/ns-abc/hello-3";
+        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1";
+        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2";
+        String topicName3 = "persistent://my-property/my-ns/hello-3";
 
         List<String> topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3);
 
-        Pattern pattern1 = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+        Pattern pattern1 = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
         List<String> result1 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern1);
         assertTrue(result1.size() == 2 && result1.contains(topicName1) && result1.contains(topicName2));
 
-        Pattern pattern2 = Pattern.compile("persistent://prop/use/ns-abc/.*");
+        Pattern pattern2 = Pattern.compile("persistent://my-property/my-ns/.*");
         List<String> result2 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern2);
         assertTrue(result2.size() == 3 &&
             result2.contains(topicName1) &&
@@ -225,12 +226,12 @@ public void testTopicsPatternFilter() throws Exception {
 
     @Test(timeOut = testTimeout)
     public void testTopicsListMinus() throws Exception {
-        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1";
-        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2";
-        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3";
-        String topicName4 = "persistent://prop/use/ns-abc/pattern-topic-4";
-        String topicName5 = "persistent://prop/use/ns-abc/pattern-topic-5";
-        String topicName6 = "persistent://prop/use/ns-abc/pattern-topic-6";
+        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1";
+        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2";
+        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3";
+        String topicName4 = "persistent://my-property/my-ns/pattern-topic-4";
+        String topicName5 = "persistent://my-property/my-ns/pattern-topic-5";
+        String topicName6 = "persistent://my-property/my-ns/pattern-topic-6";
 
         List<String> oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4);
         List<String> newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6);
@@ -269,10 +270,10 @@ public void testTopicsListMinus() throws Exception {
     public void testStartEmptyPatternConsumer() throws Exception {
         String key = "StartEmptyPatternConsumerTest";
         String subscriptionName = "my-ex-subscription-" + key;
-        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
-        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
-        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
-        Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
+        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
+        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
+        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
 
         // 1. create partition
         admin.properties().createProperty("prop", new PropertyAdmin());
@@ -352,10 +353,10 @@ public void testStartEmptyPatternConsumer() throws Exception {
     public void testAutoSubscribePatternConsumer() throws Exception {
         String key = "AutoSubscribePatternConsumer";
         String subscriptionName = "my-ex-subscription-" + key;
-        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
-        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
-        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
-        Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
+        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
+        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
+        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
 
         // 1. create partition
         admin.properties().createProperty("prop", new PropertyAdmin());
@@ -411,7 +412,7 @@ public void testAutoSubscribePatternConsumer() throws Exception {
         assertEquals(messageSet, totalMessages);
 
         // 6. create another producer with 4 partitions
-        String topicName4 = "persistent://prop/use/ns-abc/pattern-topic-4-" + key;
+        String topicName4 = "persistent://my-property/my-ns/pattern-topic-4-" + key;
         admin.persistentTopics().createPartitionedTopic(topicName4, 4);
         Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topicName4)
             .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
@@ -455,10 +456,10 @@ public void testAutoSubscribePatternConsumer() throws Exception {
     public void testAutoUnbubscribePatternConsumer() throws Exception {
         String key = "AutoUnsubscribePatternConsumer";
         String subscriptionName = "my-ex-subscription-" + key;
-        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
-        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
-        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
-        Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+        String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
+        String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
+        String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
+        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
 
         // 1. create partition
         admin.properties().createProperty("prop", new PropertyAdmin());
@@ -517,7 +518,7 @@ public void testAutoUnbubscribePatternConsumer() throws Exception {
         // seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic.
         List<String> topicNames = Lists.newArrayList(topicName2);
         NamespaceService nss = pulsar.getNamespaceService();
-        doReturn(topicNames).when(nss).getListOfTopics(NamespaceName.get("prop", "use", "ns-abc"));
+        doReturn(topicNames).when(nss).getListOfTopics(NamespaceName.get("my-property/my-ns"));
 
         // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3
         log.debug("recheck topics change");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index 23306ba03..e6adf5c90 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -31,7 +31,6 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -47,7 +46,7 @@
     @Override
     @BeforeMethod
     public void setup() throws Exception {
-        super.internalSetup();
+        super.baseSetup();
     }
 
     @Override
@@ -59,7 +58,7 @@ public void cleanup() throws Exception {
     @Test(timeOut = testTimeout)
     public void testExclusiveSingleAckedNormalTopic() throws Exception {
         String key = "testExclusiveSingleAckedNormalTopic";
-        final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
+        final String topicName = "persistent://prop/ns-abc/topic-" + key;
         final String subscriptionName = "my-ex-subscription-" + key;
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 10;
@@ -157,12 +156,11 @@ public void testExclusiveCumulativeAckedNormalTopic() throws Exception {
     @Test(timeOut = testTimeout)
     public void testSharedSingleAckedPartitionedTopic() throws Exception {
         String key = "testSharedSingleAckedPartitionedTopic";
-        final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
+        final String topicName = "persistent://prop/ns-abc/topic-" + key;
         final String subscriptionName = "my-shared-subscription-" + key;
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 20;
         final int numberOfPartitions = 3;
-        admin.properties().createProperty("prop", new PropertyAdmin());
         admin.persistentTopics().createPartitionedTopic(topicName, numberOfPartitions);
         // Special step to create partitioned topic
 
@@ -244,12 +242,11 @@ private static int receiveAllMessage(Consumer<?> consumer, boolean ackMessages)
     @Test(timeOut = testTimeout)
     public void testFailoverSingleAckedPartitionedTopic() throws Exception {
         String key = "testFailoverSingleAckedPartitionedTopic";
-        final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
+        final String topicName = "persistent://prop/ns-abc/topic-" + key;
         final String subscriptionName = "my-failover-subscription-" + key;
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 10;
         final int numberOfPartitions = 3;
-        admin.properties().createProperty("prop", new PropertyAdmin());
         admin.persistentTopics().createPartitionedTopic(topicName, numberOfPartitions);
         // Special step to create partitioned topic
 
@@ -338,7 +335,7 @@ public void testAckTimeoutMinValue() throws PulsarClientException {
     @Test(timeOut = testTimeout)
     public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException, InterruptedException {
         String key = "testCheckUnAcknowledgedMessageTimer";
-        final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
+        final String topicName = "persistent://prop/ns-abc/topic-" + key;
         final String subscriptionName = "my-ex-subscription-" + key;
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 3;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 074571500..ce371912e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -102,7 +102,7 @@ public void testServiceException() throws Exception {
     public void testTopicInternalStats() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
         final String subscriptionName = "my-subscriber-name";
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index 81520fa91..eab642886 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -68,7 +68,7 @@ public void setup() throws Exception {
         port = PortManager.nextFreePort();
         WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
         config.setWebServicePort(port);
-        config.setClusterName("use");
+        config.setClusterName("test");
         config.setAuthenticationEnabled(true);
         config.setGlobalZookeeperServers("dummy-zk-servers");
         config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user"));
@@ -118,9 +118,9 @@ protected void cleanup() throws Exception {
     }
 
     public void socketTest() throws Exception {
-        final String topic = "my-property/use/my-ns/my-topic1";
-        final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/my-sub";
-        final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic;
+        final String topic = "my-property/my-ns/my-topic1";
+        final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/my-sub";
+        final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic;
         URI consumeUri = URI.create(consumerUri);
         URI produceUri = URI.create(producerUri);
 
@@ -169,9 +169,9 @@ public void unauthenticatedSocketTest() throws Exception{
 
     @Test(timeOut=10000)
     public void statsTest() throws Exception {
-        final String topic = "my-property/use/my-ns/my-topic2";
-        final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/my-sub";
-        final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic;
+        final String topic = "persistent/my-property/my-ns/my-topic2";
+        final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/" + topic + "/my-sub";
+        final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/" + topic;
         URI consumeUri = URI.create(consumerUri);
         URI produceUri = URI.create(producerUri);
 
@@ -180,7 +180,7 @@ public void statsTest() throws Exception {
         WebSocketClient produceClient = new WebSocketClient();
         SimpleProducerSocket produceSocket = new SimpleProducerSocket();
 
-        final String baseUrl = "http://localhost:" + port + "/admin/proxy-stats/";
+        final String baseUrl = "http://localhost:" + port + "/admin/v2/proxy-stats/";
         Client client = ClientBuilder.newClient();
 
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 3c401733a..1e5409c79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -22,6 +22,10 @@
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
@@ -62,9 +66,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
 public class ProxyPublishConsumeTest extends ProducerConsumerBase {
     protected String methodName;
     private int port;
@@ -84,7 +85,7 @@ public void setup() throws Exception {
         port = PortManager.nextFreePort();
         WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
         config.setWebServicePort(port);
-        config.setClusterName("use");
+        config.setClusterName("test");
         config.setGlobalZookeeperServers("dummy-zk-servers");
         service = spy(new WebSocketService(config));
         doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
@@ -105,9 +106,9 @@ protected void cleanup() throws Exception {
     @Test(timeOut = 10000)
     public void socketTest() throws Exception {
         final String consumerUri = "ws://localhost:" + port
-                + "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
-        String readerUri = "ws://localhost:" + port + "/ws/reader/persistent/my-property/use/my-ns/my-topic1";
-        String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/my-property/use/my-ns/my-topic1/";
+                + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
+        String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/my-property/my-ns/my-topic1";
+        String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
 
         URI consumeUri = URI.create(consumerUri);
         URI readUri = URI.create(readerUri);
@@ -183,7 +184,7 @@ public void emptySubcriptionConsumerTest() throws Exception {
 
         // Empty subcription name
         final String consumerUri = "ws://localhost:" + port
-                + "/ws/consumer/persistent/my-property/use/my-ns/my-topic2/?subscriptionType=Exclusive";
+                + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic2/?subscriptionType=Exclusive";
         URI consumeUri = URI.create(consumerUri);
 
         WebSocketClient consumeClient1 = new WebSocketClient();
@@ -208,7 +209,7 @@ public void emptySubcriptionConsumerTest() throws Exception {
     @Test(timeOut = 10000)
     public void conflictingConsumerTest() throws Exception {
         final String consumerUri = "ws://localhost:" + port
-                + "/ws/consumer/persistent/my-property/use/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
+                + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
         URI consumeUri = URI.create(consumerUri);
 
         WebSocketClient consumeClient1 = new WebSocketClient();
@@ -244,7 +245,7 @@ public void conflictingConsumerTest() throws Exception {
     @Test(timeOut = 10000)
     public void conflictingProducerTest() throws Exception {
         final String producerUri = "ws://localhost:" + port
-                + "/ws/producer/persistent/my-property/use/my-ns/my-topic4?producerName=my-producer";
+                + "/ws/v2/producer/persistent/my-property/my-ns/my-topic4?producerName=my-producer";
         URI produceUri = URI.create(producerUri);
 
         WebSocketClient produceClient1 = new WebSocketClient();
@@ -277,16 +278,18 @@ public void conflictingProducerTest() throws Exception {
         }
     }
 
-    @Test(timeOut = 30000)
+    @Test// (timeOut = 30000)
     public void producerBacklogQuotaExceededTest() throws Exception {
-        admin.namespaces().createNamespace("my-property/use/ns-ws-quota");
-        admin.namespaces().setBacklogQuota("my-property/use/ns-ws-quota",
+        String namespace = "my-property/ns-ws-quota";
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setBacklogQuota(namespace,
                 new BacklogQuota(10, BacklogQuota.RetentionPolicy.producer_request_hold));
 
-        final String topic = "my-property/use/ns-ws-quota/my-topic5";
+        final String topic = namespace + "/my-topic5";
         final String subscription = "my-sub";
-        final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/" + subscription;
-        final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic;
+        final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+        final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic;
 
         URI consumeUri = URI.create(consumerUri);
         URI produceUri = URI.create(producerUri);
@@ -338,8 +341,8 @@ public void producerBacklogQuotaExceededTest() throws Exception {
             stopWebSocketClient(produceClient2);
             admin.persistentTopics().skipAllMessages("persistent://" + topic, subscription);
             admin.persistentTopics().delete("persistent://" + topic);
-            admin.namespaces().removeBacklogQuota("my-property/use/ns-ws-quota");
-            admin.namespaces().deleteNamespace("my-property/use/ns-ws-quota");
+            admin.namespaces().removeBacklogQuota(namespace);
+            admin.namespaces().deleteNamespace(namespace);
         }
     }
 
@@ -350,11 +353,11 @@ public void producerBacklogQuotaExceededTest() throws Exception {
      */
     @Test(timeOut = 10000)
     public void testProxyStats() throws Exception {
-        final String topic = "my-property/use/my-ns/my-topic6";
-        final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic
+        final String topic = "my-property/my-ns/my-topic6";
+        final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic
                 + "/my-sub?subscriptionType=Failover";
-        final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic + "/";
-        final String readerUri = "ws://localhost:" + port + "/ws/reader/persistent/" + topic;
+        final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/";
+        final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic;
         System.out.println(consumerUri+", "+producerUri);
         URI consumeUri = URI.create(consumerUri);
         URI produceUri = URI.create(producerUri);
@@ -405,7 +408,7 @@ public void testProxyStats() throws Exception {
             Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
             final String baseUrl = pulsar.getWebServiceAddress()
                     .replace(Integer.toString(pulsar.getConfiguration().getWebServicePort()), (Integer.toString(port)))
-                    + "/admin/proxy-stats/";
+                    + "/admin/v2/proxy-stats/";
 
             // verify proxy metrics
             verifyProxyMetrics(client, baseUrl);
@@ -414,7 +417,7 @@ public void testProxyStats() throws Exception {
             verifyProxyStats(client, baseUrl, topic);
 
             // verify topic stat
-            verifyTopicStat(client, baseUrl, topic);
+            verifyTopicStat(client, baseUrl + "persistent/", topic);
 
         } finally {
             stopWebSocketClient(consumeClient1, produceClient);
@@ -443,6 +446,8 @@ private void verifyProxyMetrics(Client client, String baseUrl) {
         Response response = (Response) invocationBuilder.get();
         String responseStr = response.readEntity(String.class);
         final Gson gson = new Gson();
+        System.err.println("REQ: " + statUrl);
+        System.err.println("RESPONSE: " + responseStr);
         final List<Metrics> data = gson.fromJson(responseStr, new TypeToken<List<Metrics>>() {
         }.getType());
         Assert.assertFalse(data.isEmpty());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 7e78c1224..0f70cc2e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -57,7 +57,7 @@ public void setup() throws Exception {
         port = PortManager.nextFreePort();
         WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
         config.setWebServicePort(port);
-        config.setClusterName("use");
+        config.setClusterName("test");
         config.setServiceUrl(pulsar.getWebServiceAddress());
         config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
         service = spy(new WebSocketService(config));
@@ -77,10 +77,10 @@ protected void cleanup() throws Exception {
 
     @Test(timeOut=30000)
     public void socketTest() throws Exception {
-        
-        String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
-        String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/my-property/use/my-ns/my-topic/";
-        
+
+        String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-sub";
+        String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/my-property/my-ns/my-topic/";
+
         URI consumeUri = URI.create(consumerUri);
         URI produceUri = URI.create(producerUri);
 
@@ -101,7 +101,7 @@ public void socketTest() throws Exception {
             // let it connect
             Assert.assertTrue(consumerFuture.get().isOpen());
             Assert.assertTrue(producerFuture.get().isOpen());
-            
+
             while (consumeSocket.getReceivedMessagesCount() < 10) {
                 Thread.sleep(10);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
new file mode 100644
index 000000000..fbe6e07bd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy.v1;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.client.api.v1.V1_ProducerConsumerBase;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.proxy.SimpleConsumerSocket;
+import org.apache.pulsar.websocket.proxy.SimpleProducerSocket;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
+
+    private int port;
+    private ProxyServer proxyServer;
+    private WebSocketService service;
+    private WebSocketClient consumeClient;
+    private WebSocketClient produceClient;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        port = PortManager.nextFreePort();
+        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+        config.setWebServicePort(port);
+        config.setClusterName("use");
+        config.setAuthenticationEnabled(true);
+        config.setGlobalZookeeperServers("dummy-zk-servers");
+        config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user"));
+
+        // If this is not set, 500 error occurs.
+        config.setGlobalZookeeperServers("dummy");
+
+        if (methodName.equals("authenticatedSocketTest") || methodName.equals("statsTest")) {
+            config.setAuthenticationProviders(Sets.newHashSet("org.apache.pulsar.websocket.proxy.MockAuthenticationProvider"));
+        } else {
+            config.setAuthenticationProviders(Sets.newHashSet("org.apache.pulsar.websocket.proxy.MockUnauthenticationProvider"));
+        }
+        if (methodName.equals("anonymousSocketTest")) {
+            config.setAnonymousUserRole("anonymousUser");
+        }
+
+        service = spy(new WebSocketService(config));
+        doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
+        proxyServer = new ProxyServer(config);
+        WebSocketServiceStarter.start(proxyServer, service);
+        log.info("Proxy Server Started");
+    }
+
+    @AfterMethod
+    protected void cleanup() throws Exception {
+        ExecutorService executor = newFixedThreadPool(1);
+        try {
+            executor.submit(() -> {
+                try {
+                    consumeClient.stop();
+                    produceClient.stop();
+                    log.info("proxy clients are stopped successfully");
+                } catch (Exception e) {
+                    log.error(e.getMessage());
+                }
+            }).get(2, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("failed to close clients ", e);
+        }
+        executor.shutdownNow();
+
+        super.internalCleanup();
+        service.close();
+        proxyServer.stop();
+        log.info("Finished Cleaning Up Test setup");
+
+    }
+
+    public void socketTest() throws Exception {
+        final String topic = "prop/use/my-ns/my-topic1";
+        final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/my-sub";
+        final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic;
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        consumeClient = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+        produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        consumeClient.start();
+        ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+        Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+        log.info("Connecting to : {}", consumeUri);
+
+        ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+        produceClient.start();
+        Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+        Assert.assertTrue(consumerFuture.get().isOpen());
+        Assert.assertTrue(producerFuture.get().isOpen());
+
+        consumeSocket.awaitClose(1, TimeUnit.SECONDS);
+        produceSocket.awaitClose(1, TimeUnit.SECONDS);
+        Assert.assertTrue(produceSocket.getBuffer().size() > 0);
+        Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
+    }
+
+    @Test(timeOut=10000)
+    public void authenticatedSocketTest() throws Exception {
+        socketTest();
+    }
+
+    @Test(timeOut=10000)
+    public void anonymousSocketTest() throws Exception {
+        socketTest();
+    }
+
+    @Test(timeOut=10000)
+    public void unauthenticatedSocketTest() throws Exception{
+        Exception exception = null;
+        try {
+            socketTest();
+        } catch (Exception e) {
+            exception = e;
+        }
+        Assert.assertTrue(exception instanceof java.util.concurrent.ExecutionException);
+    }
+
+    @Test(timeOut=10000)
+    public void statsTest() throws Exception {
+        final String topic = "prop/use/my-ns/my-topic2";
+        final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/my-sub";
+        final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic;
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient consumeClient = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        final String baseUrl = "http://localhost:" + port + "/admin/proxy-stats/";
+        Client client = ClientBuilder.newClient();
+
+        try {
+            consumeClient.start();
+            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+            Assert.assertTrue(consumerFuture.get().isOpen());
+
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+            Assert.assertTrue(producerFuture.get().isOpen());
+
+            int retry = 0;
+            int maxRetry = 500;
+            while (consumeSocket.getReceivedMessagesCount() < 3) {
+                Thread.sleep(10);
+                if (retry++ > maxRetry) {
+                    break;
+                }
+            }
+
+            service.getProxyStats().generate();
+
+            verifyResponseStatus(client, baseUrl + "metrics");
+            verifyResponseStatus(client, baseUrl + "stats");
+            verifyResponseStatus(client, baseUrl + topic + "/stats");
+        } finally {
+            consumeClient.stop();
+            produceClient.stop();
+            client.close();
+        }
+    }
+
+    private void verifyResponseStatus(Client client, String url) {
+        WebTarget webTarget = client.target(url);
+        Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
+        Response response = (Response) invocationBuilder.get();
+        Assert.assertEquals(response.getStatus(), 200);
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(V1_ProxyAuthenticationTest.class);
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index bc78283b2..1f0c761ee 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -214,6 +214,28 @@
      */
     void createNamespace(String namespace) throws PulsarAdminException;
 
+    /**
+     * Create a new namespace.
+     * <p>
+     * Creates a new empty namespace with no policies attached.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param clusters
+     *            Clusters in which the namespace will be present. If more than one cluster is present, replication
+     *            across clusters will be enabled.
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Property or cluster does not exist
+     * @throws ConflictException
+     *             Namespace already exists
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createNamespace(String namespace, Set<String> clusters) throws PulsarAdminException;
+
     /**
      * Delete an existing namespace.
      * <p>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
index 677dd46a9..6b3d0476b 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
@@ -38,9 +38,9 @@ public LookupImpl(WebTarget web, Authentication auth, boolean useTls) {
         v2lookup = web.path("/lookup/v2");
     }
 
-    private PulsarAdminException getLookupApiException(Exception e) {
+    private PulsarAdminException getLookupApiException(WebTarget target, Exception e) {
         if (e instanceof ClientErrorException) {
-            return new PulsarAdminException((ClientErrorException) e, e.getMessage());
+            return new PulsarAdminException((ClientErrorException) e, e.getMessage() + " at " + target.getUri());
         } else {
             return getApiException(e);
         }
@@ -48,26 +48,32 @@ private PulsarAdminException getLookupApiException(Exception e) {
 
     @Override
     public String lookupTopic(String topic) throws PulsarAdminException {
+        TopicName topicName = TopicName.get(topic);
+        String prefix = topicName.isV2() ? "/topic" : "/destination";
+        WebTarget target = v2lookup.path(prefix).path(topicName.getLookupName());
+
         try {
-            TopicName topicName = TopicName.get(topic);
-            return doTopicLookup(v2lookup.path("/destination"), topicName);
+            return doTopicLookup(target);
         } catch (Exception e) {
-            throw getLookupApiException(e);
+            throw getLookupApiException(target, e);
         }
     }
 
     @Override
     public String getBundleRange(String topic) throws PulsarAdminException {
+        TopicName topicName = TopicName.get(topic);
+        String prefix = topicName.isV2() ? "/topic" : "/destination";
+        WebTarget target = v2lookup.path(prefix).path(topicName.getLookupName()).path("bundle");
+
         try {
-            TopicName destName = TopicName.get(topic);
-            return request(v2lookup.path("/destination").path(destName.getLookupName()).path("bundle")).get(String.class);
+            return request(target).get(String.class);
         } catch (Exception e) {
-            throw getLookupApiException(e);
+            throw getLookupApiException(target, e);
         }
     }
 
-    private String doTopicLookup(WebTarget lookupResource, TopicName destName) throws PulsarAdminException {
-        LookupData lookupData = request(lookupResource.path(destName.getLookupName())).get(LookupData.class);
+    private String doTopicLookup(WebTarget lookupResource) throws PulsarAdminException {
+        LookupData lookupData = request(lookupResource).get(LookupData.class);
         if (useTls) {
             return lookupData.getBrokerUrlTls();
         } else {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index f48e81ea6..61fc9eebb 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -79,7 +79,8 @@ public NamespacesImpl(WebTarget web, Authentication auth) {
     public List<String> getTopics(String namespace) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "destinations");
+            String action = ns.isV2() ? "topics" : "destinations";
+            WebTarget path = namespacePath(ns, action);
             return request(path).get(new GenericType<List<String>>() {
             });
         } catch (Exception e) {
@@ -98,6 +99,28 @@ public Policies getPolicies(String namespace) throws PulsarAdminException {
         }
     }
 
+    @Override
+    public void createNamespace(String namespace, Set<String> clusters) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns);
+
+            if (ns.isV2()) {
+                // For V2 API we pass full Policy class instance
+                Policies policies = new Policies();
+                policies.replication_clusters = clusters;
+                request(path).put(Entity.entity(policies, MediaType.APPLICATION_JSON), ErrorData.class);
+            } else {
+                // For V1 API, we pass the BundlesData on creation
+                request(path).put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+                // For V1, we need to do it in 2 steps
+                setNamespaceReplicationClusters(namespace, clusters);
+            }
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     @Override
     public void createNamespace(String namespace, int numBundles) throws PulsarAdminException {
         createNamespace(namespace, new BundlesData(numBundles));
@@ -108,7 +131,16 @@ public void createNamespace(String namespace, BundlesData bundlesData) throws Pu
         try {
             NamespaceName ns = NamespaceName.get(namespace);
             WebTarget path = namespacePath(ns);
-            request(path).put(Entity.entity(bundlesData, MediaType.APPLICATION_JSON), ErrorData.class);
+
+            if (ns.isV2()) {
+                // For V2 API we pass full Policy class instance
+                Policies policies = new Policies();
+                policies.bundles = bundlesData;
+                request(path).put(Entity.entity(policies, MediaType.APPLICATION_JSON), ErrorData.class);
+            } else {
+                // For V1 API, we pass the BundlesData on creation
+                request(path).put(Entity.entity(bundlesData, MediaType.APPLICATION_JSON), ErrorData.class);
+            }
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -433,9 +465,8 @@ public DispatchRate getDispatchRate(String namespace) throws PulsarAdminExceptio
     public void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
-            request(adminNamespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
-                .path("subscriptionDispatchRate"))
-                .post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
+            WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
+            request(path).post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -445,8 +476,8 @@ public void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchR
     public DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
-            return request(adminNamespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
-                .path("subscriptionDispatchRate")).get(DispatchRate.class);
+            WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
+            return request(path).get(DispatchRate.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index a8ec61137..a5b045571 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -42,7 +42,9 @@
 
     private final HttpClient httpClient;
     private final boolean useTls;
-    private static final String BasePath = "lookup/v2/destination/";
+
+    private static final String BasePathV1 = "lookup/v2/destination/";
+    private static final String BasePathV2 = "lookup/v2/topic/";
 
     public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
@@ -59,7 +61,9 @@ public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopG
      */
     @SuppressWarnings("deprecation")
     public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
-        return httpClient.get(BasePath + topicName.getLookupName(), LookupData.class).thenCompose(lookupData -> {
+        String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
+
+        return httpClient.get(basePath + topicName.getLookupName(), LookupData.class).thenCompose(lookupData -> {
             // Convert LookupData into as SocketAddress, handling exceptions
         	URI uri = null;
             try {
@@ -84,8 +88,8 @@ public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopG
     }
 
     public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
-    	return httpClient.get(String.format("admin/%s/partitions", topicName.getLookupName()),
-                PartitionedTopicMetadata.class);
+        String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
+        return httpClient.get(String.format(format, topicName.getLookupName()), PartitionedTopicMetadata.class);
     }
 
     public String getServiceUrl() {
@@ -95,8 +99,10 @@ public String getServiceUrl() {
     @Override
     public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace) {
         CompletableFuture<List<String>> future = new CompletableFuture<>();
+
+        String format = namespace.isV2() ? "admin/v2/namespaces/%s/destinations" : "admin/namespaces/%s/destinations";
         httpClient
-            .get(String.format("admin/namespaces/%s/destinations", namespace), String[].class)
+            .get(String.format(format, namespace), String[].class)
             .thenAccept(topics -> {
                 List<String> result = Lists.newArrayList();
                 // do not keep partition part of topic name
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index f069d021c..60351f493 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -61,7 +61,7 @@
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
-    private final String configClusterName = "use";
+    private final String configClusterName = "test";
 
     @BeforeMethod
     @Override
@@ -162,12 +162,12 @@ public void testTlsSyncProducerAndConsumer() throws Exception {
         admin.clusters().createCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
 
-        Consumer<byte[]> consumer = proxyClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Consumer<byte[]> consumer = proxyClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
-        Producer<byte[]> producer = proxyClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = proxyClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
                 .create();
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index fda29b252..f45013b79 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -35,6 +35,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -65,15 +66,19 @@ protected void setup() throws Exception {
         providers.add(BasicAuthenticationProvider.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("use");
+        conf.setClusterName("test");
         Set<String> proxyRoles = new HashSet<String>();
         proxyRoles.add("proxy");
         conf.setProxyRoles(proxyRoles);
 
         super.init();
+
+        createAdminClient();
+        producerBaseSetup();
     }
 
     @Override
+    @AfterMethod
     protected void cleanup() throws Exception {
         super.internalCleanup();
     }
@@ -83,18 +88,16 @@ void testForwardAuthData() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         // Step 1: Create Admin Client
-        createAdminClient();
         final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
         // create a client which connects to proxy and pass authData
-        String namespaceName = "my-property/use/my-ns";
-        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        String namespaceName = "my-property/global/my-ns";
+        String topicName = "persistent://my-property/global/my-ns/my-topic1";
         String subscriptionName = "my-subscriber-name";
         String clientAuthParams = "authParam:client";
         String proxyAuthParams = "authParam:proxy";
 
-        admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
         admin.namespaces().createNamespace(namespaceName);
+        admin.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet("test"));
         admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
                 Sets.newHashSet(AuthAction.consume, AuthAction.produce));
         admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index a36566630..08c75fa6b 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -20,6 +20,8 @@
 
 import static org.mockito.Mockito.spy;
 
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,15 +42,13 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
 public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(ProxyRolesEnforcementTest.class);
 
@@ -162,15 +162,19 @@ protected void setup() throws Exception {
         providers.add(BasicAuthenticationProvider.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("use");
+        conf.setClusterName("test");
         Set<String> proxyRoles = new HashSet<String>();
         proxyRoles.add("proxy");
         conf.setProxyRoles(proxyRoles);
 
         super.init();
+
+        createAdminClient();
+        producerBaseSetup();
     }
 
     @Override
+    @AfterMethod
     protected void cleanup() throws Exception {
         super.internalCleanup();
     }
@@ -183,15 +187,14 @@ void testIncorrectRoles() throws Exception {
         createAdminClient();
         final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
         // create a client which connects to proxy and pass authData
-        String namespaceName = "my-property/use/my-ns";
-        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        String namespaceName = "my-property/global/my-ns";
+        String topicName = "persistent://my-property/global/my-ns/my-topic1";
         String subscriptionName = "my-subscriber-name";
         String clientAuthParams = "authParam:client";
         String proxyAuthParams = "authParam:proxy";
 
-        admin.properties().createProperty("my-property",
-                new PropertyAdmin(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
         admin.namespaces().createNamespace(namespaceName);
+        admin.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet("test"));
 
         admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
                 Sets.newHashSet(AuthAction.consume, AuthAction.produce));
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 3c394df72..a62649d79 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -51,7 +51,7 @@
 public class PulsarSpoutTest extends ProducerConsumerBase {
 
     public final String serviceUrl = "http://127.0.0.1:" + BROKER_WEBSERVICE_PORT;
-    public final String topic = "persistent://my-property/use/my-ns/my-topic1";
+    public final String topic = "persistent://my-property/my-ns/my-topic1";
     public final String subscriptionName = "my-subscriber-name";
 
     protected PulsarSpoutConfiguration pulsarSpoutConf;
@@ -323,7 +323,7 @@ public void testSerializability() throws Exception {
         PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf, new ClientConfiguration());
         TestUtil.testSerializability(spoutWithNoAuth);
     }
-    
+
     @Test
     public void testFailedConsumer() throws Exception {
         PulsarSpoutConfiguration pulsarSpoutConf = new PulsarSpoutConfiguration();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStats.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStatsBase.java
similarity index 57%
rename from pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStats.java
rename to pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStatsBase.java
index 702cd655e..cbbbddcf4 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStats.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStatsBase.java
@@ -18,20 +18,13 @@
  */
 package org.apache.pulsar.websocket.admin;
 
-import static org.apache.pulsar.common.util.Codec.decode;
+import com.google.common.collect.Maps;
 
 import java.util.Collection;
 import java.util.Map;
 
-import javax.ws.rs.Encoded;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 
-import com.google.common.collect.Maps;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.websocket.stats.ProxyTopicStat;
@@ -40,22 +33,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-
-@Path("/proxy-stats")
-@Api(value = "/proxy", description = "Stats for web-socket proxy", tags = "proxy-stats")
-@Produces(MediaType.APPLICATION_JSON)
-public class WebSocketProxyStats extends WebSocketWebResource {
-    private static final Logger LOG = LoggerFactory.getLogger(WebSocketProxyStats.class);
-
-    @GET
-    @Path("/metrics")
-    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Requested should be executed by Monitoring agent on each proxy to fetch the metrics", response = Metrics.class, responseContainer = "List")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public Collection<Metrics> getMetrics() throws Exception {
+public class WebSocketProxyStatsBase extends WebSocketWebResource {
+    private static final Logger LOG = LoggerFactory.getLogger(WebSocketProxyStatsBase.class);
+
+    protected Collection<Metrics> internalGetMetrics() throws Exception {
         // Ensure super user access only
         validateSuperUserAccess();
         try {
@@ -66,57 +47,44 @@
         }
     }
 
-    @GET
-    @Path("/{property}/{cluster}/{namespace}/{topic}/stats")
-    @ApiOperation(value = "Get the stats for the topic.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Topic does not exist") })
-    public ProxyTopicStat getStats(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic) {
-        topic = decode(topic);
-        TopicName topicName = TopicName.get("persistent", property, cluster, namespace, topic);
+    protected ProxyTopicStat internalGetStats(TopicName topicName) {
         validateUserAccess(topicName);
-        ProxyTopicStat stats = getStat(topicName.toString());
+        ProxyTopicStat stats = getStat(topicName);
         if (stats == null) {
             throw new RestException(Status.NOT_FOUND, "Topic does not exist");
         }
         return stats;
     }
 
-    @GET
-    @Path("/stats")
-    @ApiOperation(value = "Get the stats for the topic.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public Map<String, ProxyTopicStat> getProxyStats() {
+    protected Map<String, ProxyTopicStat> internalGetProxyStats() {
         validateSuperUserAccess();
         return getStat();
     }
 
-    public ProxyTopicStat getStat(String topicName) {
-
-        if (!service().getProducers().containsKey(topicName)
-        		&& !service().getConsumers().containsKey(topicName)
-        		&& !service().getReaders().containsKey(topicName)) {
-            LOG.warn("topic doesn't exist {}", topicName);
+    private ProxyTopicStat getStat(TopicName topicName) {
+        String topicNameStr = topicName.toString();
+        if (!service().getProducers().containsKey(topicNameStr) && !service().getConsumers().containsKey(topicNameStr)
+                && !service().getReaders().containsKey(topicNameStr)) {
+            LOG.warn("topic doesn't exist {}", topicNameStr);
             throw new RestException(Status.NOT_FOUND, "Topic does not exist");
         }
         ProxyTopicStat topicStat = new ProxyTopicStat();
-        if (service().getProducers().containsKey(topicName)){
-            service().getProducers().get(topicName).forEach(handler -> {
+        if (service().getProducers().containsKey(topicNameStr)) {
+            service().getProducers().get(topicNameStr).forEach(handler -> {
                 ProducerStats stat = new ProducerStats(handler);
                 topicStat.producerStats.add(stat);
 
             });
         }
 
-        if (service().getConsumers().containsKey(topicName)){
-            service().getConsumers().get(topicName).forEach(handler -> {
+        if (service().getConsumers().containsKey(topicNameStr)) {
+            service().getConsumers().get(topicNameStr).forEach(handler -> {
                 topicStat.consumerStats.add(new ConsumerStats(handler));
             });
         }
 
-        if (service().getReaders().containsKey(topicName)){
-            service().getReaders().get(topicName).forEach(handler -> {
+        if (service().getReaders().containsKey(topicNameStr)) {
+            service().getReaders().get(topicNameStr).forEach(handler -> {
                 topicStat.consumerStats.add(new ConsumerStats(handler));
             });
         }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
index 5f9e1c647..66050bc5b 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
@@ -36,7 +36,8 @@
 public class WebSocketWebResource {
 
     public static final String ATTRIBUTE_PROXY_SERVICE_NAME = "webProxyService";
-    public static final String ADMIN_PATH = "/admin";
+    public static final String ADMIN_PATH_V1 = "/admin";
+    public static final String ADMIN_PATH_V2 = "/admin/v2";
 
     @Context
     protected ServletContext servletContext;
@@ -48,10 +49,10 @@
     protected UriInfo uri;
 
     private WebSocketService socketService;
-    
+
     private String clientId;
     private AuthenticationDataHttps authData;
-    
+
     protected WebSocketService service() {
         if (socketService == null) {
             socketService = (WebSocketService) servletContext.getAttribute(ATTRIBUTE_PROXY_SERVICE_NAME);
@@ -109,7 +110,7 @@ protected void validateSuperUserAccess() {
 
     /**
      * Checks if user has super-user access or user is authorized to produce/consume on a given topic
-     * 
+     *
      * @param topic
      * @return
      */
@@ -128,7 +129,7 @@ protected boolean validateUserAccess(TopicName topic) {
 
     /**
      * Checks if user is authorized to produce/consume on a given topic
-     * 
+     *
      * @param topic
      * @return
      * @throws Exception
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/v1/WebSocketProxyStatsV1.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/v1/WebSocketProxyStatsV1.java
new file mode 100644
index 000000000..289c587ab
--- /dev/null
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/v1/WebSocketProxyStatsV1.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.admin.v1;
+
+import static org.apache.pulsar.common.util.Codec.decode;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import java.util.Collection;
+import java.util.Map;
+
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.websocket.admin.WebSocketProxyStatsBase;
+import org.apache.pulsar.websocket.stats.ProxyTopicStat;
+
+@Path("/proxy-stats")
+@Api(value = "/proxy", description = "Stats for web-socket proxy", tags = "proxy-stats")
+@Produces(MediaType.APPLICATION_JSON)
+public class WebSocketProxyStatsV1 extends WebSocketProxyStatsBase {
+
+    @GET
+    @Path("/metrics")
+    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Requested should be executed by Monitoring agent on each proxy to fetch the metrics", response = Metrics.class, responseContainer = "List")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
+    public Collection<Metrics> internalGetMetrics() throws Exception {
+        return super.internalGetMetrics();
+    }
+
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{topic}/stats")
+    @ApiOperation(value = "Get the stats for the topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist") })
+    public ProxyTopicStat getStats(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic) {
+        return super.internalGetStats(TopicName.get("persistent", property, cluster, namespace, decode(encodedTopic)));
+    }
+
+    @GET
+    @Path("/stats")
+    @ApiOperation(value = "Get the stats for the topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
+    public Map<String, ProxyTopicStat> internalGetProxyStats() {
+        return super.internalGetProxyStats();
+    }
+}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/v2/WebSocketProxyStatsV2.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/v2/WebSocketProxyStatsV2.java
new file mode 100644
index 000000000..771fbffd2
--- /dev/null
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/v2/WebSocketProxyStatsV2.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.admin.v2;
+
+import static org.apache.pulsar.common.util.Codec.decode;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import java.util.Collection;
+import java.util.Map;
+
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.websocket.admin.WebSocketProxyStatsBase;
+import org.apache.pulsar.websocket.stats.ProxyTopicStat;
+
+@Path("/proxy-stats")
+@Api(value = "/proxy", description = "Stats for web-socket proxy", tags = "proxy-stats")
+@Produces(MediaType.APPLICATION_JSON)
+public class WebSocketProxyStatsV2 extends WebSocketProxyStatsBase {
+    @GET
+    @Path("/metrics")
+    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Requested should be executed by Monitoring agent on each proxy to fetch the metrics", response = Metrics.class, responseContainer = "List")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
+    public Collection<Metrics> internalGetMetrics() throws Exception {
+        return super.internalGetMetrics();
+    }
+
+    @GET
+    @Path("/{domain}/{property}/{namespace}/{topic}/stats")
+    @ApiOperation(value = "Get the stats for the topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist") })
+    public ProxyTopicStat getStats(@PathParam("domain") String domain, @PathParam("property") String property,
+            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic) {
+        return super.internalGetStats(TopicName.get(domain, property, namespace, decode(encodedTopic)));
+    }
+
+    @GET
+    @Path("/stats")
+    @ApiOperation(value = "Get the stats for the topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
+    public Map<String, ProxyTopicStat> internalGetProxyStats() {
+        return super.internalGetProxyStats();
+    }
+}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 2829fdc2b..0c7bb4e4b 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -63,7 +63,7 @@
     private final List<Handler> handlers = Lists.newArrayList();
     private final WebSocketProxyConfiguration conf;
     private final ExecutorService executorService;
-    
+
     public ProxyServer(WebSocketProxyConfiguration config)
             throws PulsarClientException, MalformedURLException, PulsarServerException {
         this.conf = config;
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
index 46720443f..cba785244 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.websocket.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ADMIN_PATH;
+import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ADMIN_PATH_V1;
+import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ADMIN_PATH_V2;
+import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ATTRIBUTE_PROXY_SERVICE_NAME;
 
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
@@ -27,10 +29,10 @@
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
 import org.apache.pulsar.websocket.WebSocketReaderServlet;
 import org.apache.pulsar.websocket.WebSocketService;
-import org.apache.pulsar.websocket.admin.WebSocketProxyStats;
+import org.apache.pulsar.websocket.admin.v1.WebSocketProxyStatsV1;
+import org.apache.pulsar.websocket.admin.v2.WebSocketProxyStatsV2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.apache.pulsar.websocket.admin.WebSocketWebResource.ATTRIBUTE_PROXY_SERVICE_NAME;
 
 public class WebSocketServiceStarter {
 
@@ -55,7 +57,14 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro
         proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service));
         proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service));
         proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service));
-        proxyServer.addRestResources(ADMIN_PATH, WebSocketProxyStats.class.getPackage().getName(), ATTRIBUTE_PROXY_SERVICE_NAME, service);
+
+
+        proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2, new WebSocketProducerServlet(service));
+        proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH_V2, new WebSocketConsumerServlet(service));
+        proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new WebSocketReaderServlet(service));
+
+        proxyServer.addRestResources(ADMIN_PATH_V1, WebSocketProxyStatsV1.class.getPackage().getName(), ATTRIBUTE_PROXY_SERVICE_NAME, service);
+        proxyServer.addRestResources(ADMIN_PATH_V2, WebSocketProxyStatsV2.class.getPackage().getName(), ATTRIBUTE_PROXY_SERVICE_NAME, service);
         proxyServer.addRestResources("/", VipStatus.class.getPackage().getName(),
                 VipStatus.ATTRIBUTE_STATUS_FILE_PATH, service.getConfig().getStatusFilePath());
         proxyServer.start();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services