You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/06/01 06:24:13 UTC

[GitHub] sijie closed pull request #1459: [table service] cleanup : provide unified service uri for resolving service endpoints

sijie closed pull request #1459: [table service] cleanup : provide unified service uri for resolving service endpoints
URL: https://github.com/apache/bookkeeper/pull/1459
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java
new file mode 100644
index 000000000..289c3f372
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.net;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.net.URI;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * ServiceURI represents service uri within bookkeeper cluster.
+ *
+ * <h3>Service URI syntax and components</h3>
+ *
+ * <p>At the highest level a service uri is a {@link java.net.URI} in string
+ * form has the syntax.
+ *
+ * <blockquote>
+ * [<i>service</i>[<i>service-specific-part</i>]<b>{@code :}</b>[<b>{@code //}</b><i>authority</i>][<i>path</i>]
+ * </blockquote>
+ *
+ * <p>where the characters <b>{@code :}</b> and <b>{@code /}</b> stand for themselves.
+ *
+ * <p>The service-specific-part of a service URI consists of the backend information used for services to use.
+ * It has the syntax as below:
+ *
+ * <blockquote>
+ * [({@code -}|{@code +})][<i>backend-part</i>]
+ * </blockquote>
+ *
+ * <p>where the characters <b>{@code -}</b> and <b>{@code +}</b> stand as a separator to separate service type
+ * from service backend information.
+ *
+ * <p>The authority component of a service URI has the same meaning as the authority component
+ * in a {@link java.net.URI}. If specified, it should be <i>server-based</i>. A server-based
+ * authority parses according to the familiar syntax
+ *
+ * <blockquote>
+ * [<i>user-info</i><b>{@code @}</b>]<i>host</i>[<b>{@code :}</b><i>port</i>]
+ * </blockquote>
+ *
+ * <p>where the characters <b>{@code @}</b> and <b>{@code :}</b> stand for themselves.
+ *
+ * <p>The path component of a service URI is itself said to be absolute. It typically means which path a service
+ * stores metadata or data.
+ *
+ * <p>All told, then, a service URI instance has the following components:
+ *
+ * <blockquote>
+ * <table summary="Describes the components of a service
+ * URI:service,service-specific-part,authority,user-info,host,port,path">
+ * <tr><td>service</td><td>{@code String}</td></tr>
+ * <tr><td>service-specific-part&nbsp;&nbsp;&nbsp;&nbsp;</td><td>{@code String}</td></tr>
+ * <tr><td>authority</td><td>{@code String}</td></tr>
+ * <tr><td>user-info</td><td>{@code String}</td></tr>
+ * <tr><td>host</td><td>{@code String}</td></tr>
+ * <tr><td>port</td><td>{@code int}</td></tr>
+ * <tr><td>path</td><td>{@code String}</td></tr>
+ * </table>
+ * </blockquote>
+ *
+ * <p>Some examples of service URIs are:
+ *
+ * <blockquote>
+ * {@code zk://localhost:2181/cluster1/ledgers} =&gt; ledger service uri using default ledger manager<br>
+ * {@code zk+hierarchical://localhost:2181/ledgers} =&gt; ledger service uri using hierarchical ledger manager<br>
+ * {@code etcd://localhost/ledgers} =&gt; ledger service uri using etcd as metadata store<br>
+ * {@code distributedlog://localhost:2181/distributedlog} =&gt; distributedlog namespace<br>
+ * {@code distributedlog-bk://localhost:2181/distributedlog} =&gt; distributedlog namespace with bk backend<br>
+ * {@code bk://bookkeeper-cluster/} =&gt; stream storage service uri <br>
+ * {@code host1:port,host2:port} =&gt; a list of hosts as bootstrap hosts to a stream storage cluster}
+ * </blockquote>
+ *
+ * @since 4.8.0
+ */
+@Public
+@Evolving
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@EqualsAndHashCode
+public class ServiceURI {
+
+    /**
+     * Service string for ledger service that uses zookeeper as metadata store.
+     */
+    public static final String SERVICE_ZK   = "zk";
+
+    /**
+     * Service string for dlog service.
+     */
+    public static final String SERVICE_DLOG = "distributedlog";
+
+    /**
+     * Service string for bookkeeper service.
+     */
+    public static final String SERVICE_BK = "bk";
+
+    private static final String SERVICE_SEP = "+";
+    private static final String SERVICE_DLOG_SEP = "-";
+
+    /**
+     * Create a service uri instance from a uri string.
+     *
+     * @param uriStr service uri string
+     * @return a service uri instance
+     * @throws NullPointerException if {@code uriStr} is null
+     * @throws IllegalArgumentException if the given string violates RFC&nbsp;2396
+     */
+    public static ServiceURI create(String uriStr) {
+        checkNotNull(uriStr, "service uri string is null");
+
+        // a service uri first should be a valid java.net.URI
+        URI uri = URI.create(uriStr);
+
+        return create(uri);
+    }
+
+    /**
+     * Create a service uri instance from a {@link URI} instance.
+     *
+     * @param uri {@link URI} instance
+     * @return a service uri instance
+     * @throws NullPointerException if {@code uriStr} is null
+     * @throws IllegalArgumentException if the given string violates RFC&nbsp;2396
+     */
+    public static ServiceURI create(URI uri) {
+        checkNotNull(uri, "service uri instance is null");
+
+        String serviceName = null;
+        String[] serviceInfos = new String[0];
+        String scheme = uri.getScheme();
+        if (null != scheme) {
+            scheme = scheme.toLowerCase();
+            final String serviceSep;
+            if (scheme.startsWith(SERVICE_DLOG)) {
+                serviceSep = SERVICE_DLOG_SEP;
+            } else {
+                serviceSep = SERVICE_SEP;
+            }
+            String[] schemeParts = StringUtils.split(scheme, serviceSep);
+            serviceName = schemeParts[0];
+            serviceInfos = new String[schemeParts.length - 1];
+            System.arraycopy(schemeParts, 1, serviceInfos, 0, serviceInfos.length);
+        }
+
+        String userAndHostInformation = uri.getAuthority();
+        checkArgument(!Strings.isNullOrEmpty(userAndHostInformation),
+            "authority component is missing in service uri : " + uri);
+
+        String serviceUser;
+        List<String> serviceHosts;
+        int atIndex = userAndHostInformation.indexOf('@');
+        Splitter splitter = Splitter.on(CharMatcher.anyOf(",;"));
+        if (atIndex > 0) {
+            serviceUser = userAndHostInformation.substring(0, atIndex);
+            serviceHosts = splitter.splitToList(userAndHostInformation.substring(atIndex + 1));
+        } else {
+            serviceUser = null;
+            serviceHosts = splitter.splitToList(userAndHostInformation);
+        }
+        serviceHosts.forEach(host -> validateHostName(host));
+
+        String servicePath = uri.getPath();
+        checkArgument(null != servicePath,
+            "service path component is missing in service uri : " + uri);
+
+        return new ServiceURI(
+            serviceName,
+            serviceInfos,
+            serviceUser,
+            serviceHosts.toArray(new String[serviceHosts.size()]),
+            servicePath,
+            uri);
+    }
+
+    private static void validateHostName(String hostname) {
+        String[] parts = hostname.split(":");
+        if (parts.length >= 3) {
+            throw new IllegalArgumentException("Invalid hostname : " + hostname);
+        } else if (parts.length == 2) {
+            try {
+                Integer.parseUnsignedInt(parts[1]);
+            } catch (NumberFormatException nfe) {
+                throw new IllegalArgumentException("Invalid hostname : " + hostname);
+            }
+        }
+
+    }
+
+    private final String serviceName;
+    private final String[] serviceInfos;
+    private final String serviceUser;
+    private final String[] serviceHosts;
+    private final String servicePath;
+    private final URI uri;
+
+    @SuppressFBWarnings("EI_EXPOSE_REP")
+    public String[] getServiceInfos() {
+        return serviceInfos;
+    }
+
+    @SuppressFBWarnings("EI_EXPOSE_REP")
+    public String[] getServiceHosts() {
+        return serviceHosts;
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/AbstractNameResolverFactory.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/package-info.java
similarity index 57%
rename from stream/common/src/main/java/org/apache/bookkeeper/common/resolver/AbstractNameResolverFactory.java
rename to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/package-info.java
index 725b5f77f..cb0b13a20 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/AbstractNameResolverFactory.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/package-info.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,23 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.common.resolver;
-
-import io.grpc.NameResolver;
-
 /**
- * The abstract bookkeeper name resolver factory.
- *
- * <p>The name resolver is responsible for creating specific name resolver
- * which provides addresses for {@link io.grpc.LoadBalancer}.
+ * Classes for resolving service uris in bookkeeper.
  */
-public abstract class AbstractNameResolverFactory extends NameResolver.Factory {
-
-    /**
-     * Gets name of the name resolver factory.
-     *
-     * @return name of the name resolver factory.
-     */
-    public abstract String name();
-
-}
+package org.apache.bookkeeper.common.net;
\ No newline at end of file
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/net/ServiceURITest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/net/ServiceURITest.java
new file mode 100644
index 000000000..9982c43a0
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/net/ServiceURITest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.net;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.net.URI;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ServiceURI}.
+ */
+public class ServiceURITest {
+
+    private static void assertServiceUri(
+        String serviceUri,
+        String expectedServiceName,
+        String[] expectedServiceInfo,
+        String expectedServiceUser,
+        String[] expectedServiceHosts,
+        String expectedServicePath) {
+
+        ServiceURI serviceURI = ServiceURI.create(serviceUri);
+
+        assertEquals(expectedServiceName, serviceURI.getServiceName());
+        assertArrayEquals(expectedServiceInfo, serviceURI.getServiceInfos());
+        assertEquals(expectedServiceUser, serviceURI.getServiceUser());
+        assertArrayEquals(expectedServiceHosts, serviceURI.getServiceHosts());
+        assertEquals(expectedServicePath, serviceURI.getServicePath());
+    }
+
+    @Test
+    public void testInvalidServiceUris() {
+        String[] uris = new String[] {
+            "://localhost:2181/path/to/namespace",          // missing scheme
+            "bk:///path/to/namespace",                      // missing authority
+            "bk://localhost:2181:3181/path/to/namespace",   // invalid hostname pair
+            "bk://localhost:xyz/path/to/namespace",         // invalid port
+            "bk://localhost:-2181/path/to/namespace",       // negative port
+        };
+
+        for (String uri : uris) {
+            testInvalidServiceUri(uri);
+        }
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testNullServiceUriString() {
+        ServiceURI.create((String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testNullServiceUriInstance() {
+        ServiceURI.create((URI) null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testEmptyServiceUriString() {
+        ServiceURI.create("");
+    }
+
+    private void testInvalidServiceUri(String serviceUri) {
+        try {
+            ServiceURI.create(serviceUri);
+            fail("Should fail to parse service uri : " + serviceUri);
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMissingServiceName() {
+        String serviceUri = "//localhost:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            null, new String[0], null, new String[] { "localhost:2181" }, "/path/to/namespace");
+    }
+
+    @Test
+    public void testEmptyPath() {
+        String serviceUri = "bk://localhost:2181";
+        assertServiceUri(
+            serviceUri,
+            "bk", new String[0], null, new String[] { "localhost:2181" }, "");
+    }
+
+    @Test
+    public void testRootPath() {
+        String serviceUri = "bk://localhost:2181/";
+        assertServiceUri(
+            serviceUri,
+            "bk", new String[0], null, new String[] { "localhost:2181" }, "/");
+    }
+
+    @Test
+    public void testUserInfo() {
+        String serviceUri = "bk://bookkeeper@localhost:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            "bookkeeper",
+            new String[] { "localhost:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsSemiColon() {
+        String serviceUri = "bk://host1:2181;host2:2181;host3:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            null,
+            new String[] { "host1:2181", "host2:2181", "host3:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsComma() {
+        String serviceUri = "bk://host1:2181,host2:2181,host3:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            null,
+            new String[] { "host1:2181", "host2:2181", "host3:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutPorts() {
+        String serviceUri = "bk://host1,host2,host3/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            null,
+            new String[] { "host1", "host2", "host3" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsMixed() {
+        String serviceUri = "bk://host1:2181,host2,host3:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            null,
+            new String[] { "host1:2181", "host2", "host3:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testUserInfoWithMultipleHosts() {
+        String serviceUri = "bk://bookkeeper@host1:2181;host2:2181;host3:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            "bookkeeper",
+            new String[] { "host1:2181", "host2:2181", "host3:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testServiceInfoPlus() {
+        String serviceUri = "bk+ssl://host:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[] { "ssl" },
+            null,
+            new String[] { "host:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testServiceInfoMinus() {
+        String serviceUri = "bk-ssl://host:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk-ssl",
+            new String[0],
+            null,
+            new String[] { "host:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testServiceInfoDlogMinus() {
+        String serviceUri = "distributedlog-bk://host:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "distributedlog",
+            new String[] { "bk" },
+            null,
+            new String[] { "host:2181" },
+            "/path/to/namespace");
+    }
+
+}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
index f0e797467..b1ac23992 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
+++ b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
@@ -28,12 +28,10 @@
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.stream.cli.commands.CmdBase;
 import org.apache.bookkeeper.stream.cli.commands.CmdNamespace;
 import org.apache.bookkeeper.stream.cli.commands.CmdStream;
 import org.apache.bookkeeper.stream.cli.commands.CmdTable;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
 
 /**
  * Bookie Shell.
@@ -83,8 +81,8 @@ public static void unregisterSubcommand(String commandName) {
     @Getter(AccessLevel.PACKAGE)
     static class ShellArguments {
 
-        @Parameter(names = { "-s", "--server" }, description = "A storage server address")
-        private String endpoint = null;
+        @Parameter(names = { "-u", "--server-uri" }, description = "The bookkeeper service uri")
+        private String serviceUri = "bk://localhost:4181";
 
         @Parameter(names = { "-n", "--namespace" }, description = "Namespace")
         private String namespace = "default";
@@ -155,16 +153,15 @@ boolean run(String[] args) {
             return false;
         }
 
-        if (null == shellArgs.endpoint) {
+        if (null == shellArgs.serviceUri) {
             System.err.println("No endpoint is provided");
             commander.usage();
             return false;
         }
 
-        Endpoint endpoint = NetUtils.parseEndpoint(shellArgs.endpoint);
-        settingsBuilder.addEndpoints(endpoint);
+        settingsBuilder.serviceUri(shellArgs.serviceUri);
 
-        log.info("connecting to storage service = {}", endpoint);
+        log.info("connecting to storage service = {}", shellArgs.serviceUri);
 
         if (cmdPos == args.length) {
             commander.usage();
diff --git a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/TestStorageClientBuilder.java b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/TestStorageClientBuilder.java
index c80010a87..39bd14ac7 100644
--- a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/TestStorageClientBuilder.java
+++ b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/TestStorageClientBuilder.java
@@ -47,7 +47,9 @@ public void testBuildClientNullNamespaceName() {
     @Test(expected = IllegalArgumentException.class)
     public void testBuildClientInvalidNamespaceName() {
         StorageClientBuilder.newBuilder()
-            .withSettings(mock(StorageClientSettings.class))
+            .withSettings(StorageClientSettings.newBuilder()
+                .serviceUri("bk://localhost:4181")
+                .build())
             .withNamespace("invalid-namespace")
             .build();
     }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index 87768faaa..9a0fc6d82 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -18,16 +18,11 @@
 
 package org.apache.bookkeeper.clients.config;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.NameResolver;
-import java.util.List;
 import java.util.Optional;
 import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.common.util.Backoff;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.inferred.freebuilder.FreeBuilder;
 
 /**
@@ -44,18 +39,11 @@
     int numWorkerThreads();
 
     /**
-     * Returns the name resolver factory used by zstream client.
-     *
-     * @return name resolver factory.
-     */
-    Optional<NameResolver.Factory> nameResolverFactory();
-
-    /**
-     * Returns the endpoints used by the client builder.
+     * Returns the service uri that storage client should talk to.
      *
-     * @return the list of endpoints.
+     * @return service uri
      */
-    List<Endpoint> endpoints();
+    String serviceUri();
 
     /**
      * Return the endpoint resolver for resolving individual endpoints.
@@ -66,13 +54,6 @@
      */
     EndpointResolver endpointResolver();
 
-    /**
-     * Returns the builder to create the managed channel.
-     *
-     * @return
-     */
-    Optional<ManagedChannelBuilder> managedChannelBuilder();
-
     /**
      * Use of a plaintext connection to the server. By default a secure connection mechanism
      * such as TLS will be used.
@@ -114,14 +95,13 @@
 
         @Override
         public StorageClientSettings build() {
-            checkArgument(
-                nameResolverFactory().isPresent()
-                    || !endpoints().isEmpty()
-                    || managedChannelBuilder().isPresent(),
-                "No name resolver or endpoints or channel builder provided");
-            return super.build();
-        }
+            StorageClientSettings settings = super.build();
+
+            // create a service uri to ensure the service uri is valid
+            ServiceURI.create(serviceUri());
 
+            return settings;
+        }
     }
 
     static Builder newBuilder() {
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
index db1c656fc..8b589c3e6 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
@@ -24,8 +24,6 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.NameResolver;
 import io.grpc.Status;
 import io.grpc.StatusException;
 import io.grpc.StatusRuntimeException;
@@ -36,8 +34,8 @@
 import java.util.stream.Stream;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.impl.internal.api.LocationClient;
-import org.apache.bookkeeper.clients.resolver.SimpleStreamResolverFactory;
 import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.clients.utils.GrpcChannels;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -63,28 +61,14 @@ public LocationClientImpl(StorageClientSettings settings,
                               OrderedScheduler scheduler) {
         this.settings = settings;
         this.scheduler = scheduler;
-        ManagedChannelBuilder builder = settings.managedChannelBuilder().orElse(
-            ManagedChannelBuilder
-                .forTarget("stream")
-                .nameResolverFactory(getResolver(settings))
-        );
-        if (settings.usePlaintext()) {
-            builder = builder.usePlaintext(true);
-        }
-        this.channel = builder.build();
+        this.channel = GrpcChannels.createChannelBuilder(
+            settings.serviceUri(), settings
+        ).build();
         this.locationService = GrpcUtils.configureGrpcStub(
             StorageContainerServiceGrpc.newFutureStub(channel),
             Optional.empty());
     }
 
-    private NameResolver.Factory getResolver(StorageClientSettings settings) {
-        if (settings.nameResolverFactory().isPresent()) {
-            return settings.nameResolverFactory().get();
-        } else {
-            return SimpleStreamResolverFactory.of(settings.endpoints());
-        }
-    }
-
     private Stream<Long> getDefaultBackoffs() {
         return Backoff.exponential(
             ClientConstants.DEFAULT_BACKOFF_START_MS,
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/SimpleStreamResolverFactory.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/SimpleStreamResolverFactory.java
deleted file mode 100644
index 512ee06c2..000000000
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/SimpleStreamResolverFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.clients.resolver;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import io.grpc.Attributes;
-import io.grpc.NameResolver;
-import java.net.URI;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.bookkeeper.clients.utils.ClientResources;
-import org.apache.bookkeeper.common.resolver.AbstractNameResolverFactory;
-import org.apache.bookkeeper.common.resolver.SimpleNameResolver;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-
-/**
- * A simple resolver factory that creates simple name resolver.
- */
-public class SimpleStreamResolverFactory extends AbstractNameResolverFactory {
-
-    private static final String SCHEME = "stream";
-    private static final String NAME = "simple";
-
-    public static final SimpleStreamResolverFactory of(List<Endpoint> endpoints) {
-        return new SimpleStreamResolverFactory(endpoints);
-    }
-
-    public static final SimpleStreamResolverFactory of(Endpoint... endpoints) {
-        return new SimpleStreamResolverFactory(Lists.newArrayList(endpoints));
-    }
-
-    private final List<URI> endpointURIs;
-
-    private SimpleStreamResolverFactory(List<Endpoint> endpoints) {
-        this.endpointURIs = Lists.transform(
-            endpoints,
-            new Function<Endpoint, URI>() {
-                @Override
-                public URI apply(Endpoint endpoint) {
-                    return URI.create(SCHEME + "://" + endpoint.getHostname() + ":" + endpoint.getPort());
-                }
-            }
-        );
-    }
-
-    @Override
-    public String name() {
-        return NAME;
-    }
-
-    @Nullable
-    @Override
-    public NameResolver newNameResolver(URI targetUri, Attributes params) {
-        if (!Objects.equal(SCHEME, targetUri.getScheme())) {
-            return null;
-        }
-        String targetPath = checkNotNull(targetUri.getPath());
-        checkArgument(targetPath.startsWith("/"),
-            "the path component (%s) of the target (%s) must start with '/'",
-            targetPath, targetUri);
-        String name = targetPath.substring(1);
-        return new SimpleNameResolver(name, ClientResources.shared().executor(), this.endpointURIs);
-    }
-
-    @Override
-    public String getDefaultScheme() {
-        return SCHEME;
-    }
-}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
new file mode 100644
index 000000000..8588359f0
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.utils;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.common.resolver.ServiceNameResolverProvider;
+
+/**
+ * Utils to create grpc channels.
+ */
+@Slf4j
+public final class GrpcChannels {
+
+    private static final String BACKEND_INPROCESS = "inprocess";
+
+    private GrpcChannels() {}
+
+    /**
+     * Create a channel builder from <tt>serviceUri</tt> with client <tt>settings</tt>.
+     *
+     * @param serviceUri service uri
+     * @param settings client settings
+     * @return managed channel builder
+     */
+    public static ManagedChannelBuilder createChannelBuilder(String serviceUri,
+                                                             StorageClientSettings settings) {
+        ServiceURI uri = ServiceURI.create(serviceUri);
+
+        ManagedChannelBuilder builder;
+        if (uri.getServiceInfos().length > 0 && uri.getServiceInfos()[0].equals(BACKEND_INPROCESS)) {
+            // this is an inprocess service, so build an inprocess channel.
+            String serviceName = uri.getServiceHosts()[0];
+            builder = InProcessChannelBuilder.forName(serviceName).directExecutor();
+        } else {
+            builder = ManagedChannelBuilder.forTarget(serviceUri)
+                .nameResolverFactory(new ServiceNameResolverProvider().toFactory());
+        }
+        if (settings.usePlaintext()) {
+            builder = builder.usePlaintext();
+        }
+        return builder;
+    }
+
+}
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/config/TestStorageClientSettings.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/config/TestStorageClientSettings.java
index ae08c8366..a4e4294fc 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/config/TestStorageClientSettings.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/config/TestStorageClientSettings.java
@@ -23,9 +23,6 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import com.google.common.collect.Lists;
-import java.util.List;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.junit.Test;
 
 /**
@@ -35,14 +32,10 @@
 
     @Test
     public void testDefault() {
-        List<Endpoint> endpoints = Lists.newArrayList(
-            Endpoint.newBuilder()
-                .setHostname("127.0.0.1")
-                .setPort(80)
-                .build());
         StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addAllEndpoints(endpoints)
+            .serviceUri("bk://127.0.0.1:4181/")
             .build();
+        assertEquals("bk://127.0.0.1:4181/", settings.serviceUri());
         assertEquals(Runtime.getRuntime().availableProcessors(), settings.numWorkerThreads());
         assertTrue(settings.usePlaintext());
         assertFalse(settings.clientName().isPresent());
@@ -53,8 +46,8 @@ public void testEmptyBuilder() {
         try {
             StorageClientSettings.newBuilder().build();
             fail("Should fail with missing endpoints");
-        } catch (IllegalArgumentException iae) {
-            assertEquals("No name resolver or endpoints or channel builder provided", iae.getMessage());
+        } catch (IllegalStateException iae) {
+            assertEquals("Not set: [serviceUri]", iae.getMessage());
         }
     }
 
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/grpc/GrpcClientTestBase.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/grpc/GrpcClientTestBase.java
index 9088fde8e..f90a9aa21 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/grpc/GrpcClientTestBase.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/grpc/GrpcClientTestBase.java
@@ -50,7 +50,7 @@
         .setPort(4181)
         .build();
 
-    protected String serverName = "fake server for " + getClass();
+    protected String serverName;
     protected final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
     protected Server fakeServer;
     protected OrderedScheduler scheduler;
@@ -61,6 +61,7 @@
 
     @Before
     public void setUp() throws Exception {
+        serverName = "fake-server";
         fakeServer = InProcessServerBuilder
             .forName(serverName)
             .fallbackHandlerRegistry(serviceRegistry)
@@ -72,8 +73,7 @@ public void setUp() throws Exception {
             .numThreads(Runtime.getRuntime().availableProcessors())
             .build();
         settings = StorageClientSettings.newBuilder()
-            .managedChannelBuilder(InProcessChannelBuilder.forName(serverName).directExecutor())
-            .usePlaintext(true)
+            .serviceUri("bk+inprocess://" + serverName)
             .build();
         serverManager = new StorageServerClientManagerImpl(
             settings,
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplTestBase.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplTestBase.java
index a962a2170..c61052c48 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplTestBase.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplTestBase.java
@@ -27,6 +27,7 @@
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
 import org.apache.bookkeeper.clients.exceptions.ClientException;
 import org.apache.bookkeeper.clients.exceptions.InvalidNamespaceNameException;
 import org.apache.bookkeeper.clients.exceptions.NamespaceExistsException;
@@ -118,7 +119,7 @@ public void testRequestSuccess() throws Exception {
 
         RootRangeServiceImplBase rootRangeService = createRootRangeServiceForSuccess();
         serviceRegistry.addService(rootRangeService.bindService());
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
@@ -138,7 +139,7 @@ public void testRequestFailure() throws Exception {
 
         RootRangeServiceImplBase rootRangeService = createRootRangeServiceForRequestFailure();
         serviceRegistry.addService(rootRangeService.bindService());
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
@@ -158,7 +159,7 @@ public void testRpcFailure() throws Exception {
 
         RootRangeServiceImplBase rootRangeService = createRootRangeServiceForRpcFailure();
         serviceRegistry.addService(rootRangeService.bindService());
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestLocationClientImpl.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestLocationClientImpl.java
index c2d4fc25d..665b8ac92 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestLocationClientImpl.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestLocationClientImpl.java
@@ -30,7 +30,6 @@
 import io.grpc.Status;
 import io.grpc.StatusException;
 import io.grpc.StatusRuntimeException;
-import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.stub.StreamObserver;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -129,8 +128,7 @@ public void getStorageContainerEndpoint(GetStorageContainerEndpointRequest reque
     protected void doSetup() throws Exception {
         StorageClientSettings settings =
             StorageClientSettings.newBuilder()
-                .managedChannelBuilder(InProcessChannelBuilder.forName(serverName).directExecutor())
-                .usePlaintext(true)
+                .serviceUri("bk+inprocess://" + serverName)
                 .build();
         locationClient = new LocationClientImpl(settings, scheduler);
         locationServiceDefinition = locationService.bindService();
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
index 971616245..985b95515 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
@@ -34,6 +34,7 @@
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import lombok.Cleanup;
 import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannelManager;
@@ -145,7 +146,7 @@ public void getActiveRanges(GetActiveRangesRequest request,
         };
         serviceRegistry.addService(metaRangeService.bindService());
 
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
@@ -169,7 +170,7 @@ public void getActiveRanges(GetActiveRangesRequest request,
         };
         serviceRegistry.addService(metaRangeService.bindService());
 
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
new file mode 100644
index 000000000..9e11adf53
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.resolver;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import io.grpc.Attributes;
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+import io.grpc.internal.DnsNameResolverProvider;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
+
+/**
+ * An implementation of {@link NameResolverProvider} that provides {@link NameResolver}s
+ * to resolve {@link org.apache.bookkeeper.common.net.ServiceURI}.
+ */
+@Slf4j
+public final class ServiceNameResolverProvider extends NameResolverProvider {
+
+    private final DnsNameResolverProvider dnsProvider;
+    private final Resource<ExecutorService> executorResource;
+
+    public ServiceNameResolverProvider() {
+        this.dnsProvider = new DnsNameResolverProvider();
+        this.executorResource = new Resource<ExecutorService>() {
+            @Override
+            public ExecutorService create() {
+                return Executors.newSingleThreadScheduledExecutor();
+            }
+
+            @Override
+            public void close(ExecutorService instance) {
+                instance.shutdown();
+            }
+        };
+    }
+
+    @Override
+    protected boolean isAvailable() {
+        return true;
+    }
+
+    @Override
+    protected int priority() {
+        return 10;
+    }
+
+    @Nullable
+    @Override
+    public NameResolver newNameResolver(URI targetUri, Attributes params) {
+        ServiceURI serviceURI;
+        try {
+            serviceURI = ServiceURI.create(targetUri);
+        } catch (NullPointerException | IllegalArgumentException e) {
+            // invalid uri here, so return null to allow grpc to use other name resolvers
+            log.info("ServiceNameResolverProvider doesn't know how to resolve {} : cause {}",
+                targetUri, e.getMessage());
+            return null;
+        }
+
+        if (null == serviceURI.getServiceName()
+            || ServiceURI.SERVICE_BK.equals(serviceURI.getServiceName())) {
+
+            String[] hosts = serviceURI.getServiceHosts();
+            if (hosts.length == 0) {
+                // no host is find, so return null to let grpc choose other resolver.
+                return null;
+            } else if (hosts.length == 1) {
+                // create a dns name resolver
+                URI dnsUri = URI.create("dns:///" + hosts[0]);
+                return dnsProvider.newNameResolver(dnsUri, params);
+            } else {
+                // create a static resolver taking the list of servers.
+                List<String> hostList = new ArrayList<>();
+                for (String host : hosts) {
+                    hostList.add(host);
+                }
+                List<URI> hostUris = Lists.transform(
+                    hostList,
+                    new Function<String, URI>() {
+                        @Nullable
+                        @Override
+                        public URI apply(@Nullable String host) {
+                            return URI.create("//" + host);
+                        }
+                    }
+                );
+
+                return new StaticNameResolver(
+                    "static",
+                    executorResource,
+                    hostUris);
+            }
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public String getDefaultScheme() {
+        return ServiceURI.SERVICE_BK;
+    }
+
+    public NameResolver.Factory toFactory() {
+        return new NameResolverFactory(Lists.newArrayList(this));
+    }
+
+    private static class NameResolverFactory extends NameResolver.Factory {
+        private final List<NameResolverProvider> providers;
+
+        public NameResolverFactory(List<NameResolverProvider> providers) {
+            this.providers = providers;
+        }
+
+        @Override
+        public NameResolver newNameResolver(URI targetUri, Attributes params) {
+            checkForProviders();
+            for (NameResolverProvider provider : providers) {
+                NameResolver resolver = provider.newNameResolver(targetUri, params);
+                if (resolver != null) {
+                    return resolver;
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public String getDefaultScheme() {
+            checkForProviders();
+            return providers.get(0).getDefaultScheme();
+        }
+
+        private void checkForProviders() {
+            checkState(!providers.isEmpty(),
+                "No NameResolverProviders found. Please check your configuration");
+        }
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/SimpleNameResolver.java b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/StaticNameResolver.java
similarity index 94%
rename from stream/common/src/main/java/org/apache/bookkeeper/common/resolver/SimpleNameResolver.java
rename to stream/common/src/main/java/org/apache/bookkeeper/common/resolver/StaticNameResolver.java
index 7ff4e89e9..3f24ab9d7 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/SimpleNameResolver.java
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/StaticNameResolver.java
@@ -32,11 +32,11 @@
 /**
  * A simple name resolver that returns pre-configured host addresses.
  */
-public class SimpleNameResolver extends AbstractNameResolver {
+public class StaticNameResolver extends AbstractNameResolver {
 
     private final List<EquivalentAddressGroup> servers;
 
-    public SimpleNameResolver(String name,
+    public StaticNameResolver(String name,
                               Resource<ExecutorService> executorResource,
                               List<URI> endpoints) {
         super(name, executorResource);
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/reslover/TestSimpleNameResolver.java b/stream/common/src/test/java/org/apache/bookkeeper/common/reslover/TestSimpleNameResolver.java
index 58a09719a..a7814867a 100644
--- a/stream/common/src/test/java/org/apache/bookkeeper/common/reslover/TestSimpleNameResolver.java
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/reslover/TestSimpleNameResolver.java
@@ -28,12 +28,12 @@
 import java.net.URI;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.apache.bookkeeper.common.resolver.SimpleNameResolver;
+import org.apache.bookkeeper.common.resolver.StaticNameResolver;
 import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
 import org.junit.Test;
 
 /**
- * Unit test of {@link SimpleNameResolver}.
+ * Unit test of {@link StaticNameResolver}.
  */
 public class TestSimpleNameResolver {
 
@@ -61,7 +61,7 @@ public void testGetServers() {
             .collect(Collectors.toList());
 
         @SuppressWarnings("unchecked") // for the mock
-            SimpleNameResolver nameResolver = new SimpleNameResolver(
+            StaticNameResolver nameResolver = new StaticNameResolver(
             "test-name-resolver",
             mock(Resource.class),
             uris);
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/DLUtils.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/DLUtils.java
index c89db4745..635f86ff5 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/DLUtils.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/DLUtils.java
@@ -26,16 +26,12 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
-import org.apache.commons.lang.StringUtils;
+import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
 
-
-
-
-
 /**
  * Utilities about DL implementations like uri, log segments, metadata serialization and deserialization.
  */
@@ -228,17 +224,14 @@ public static long bytes2LogSegmentId(byte[] data) {
      * @return the normalized uri
      */
     public static URI normalizeURI(URI uri) {
-        checkNotNull(uri, "DistributedLog uri is null");
-        String scheme = uri.getScheme();
-        checkNotNull(scheme, "Invalid distributedlog uri : " + uri);
-        scheme = scheme.toLowerCase();
-        String[] schemeParts = StringUtils.split(scheme, '-');
-        checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
+        ServiceURI serviceURI = ServiceURI.create(uri);
+        checkNotNull(serviceURI.getServiceName(), "Invalid distributedlog uri : " + uri);
+        checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, serviceURI.getServiceName()),
                 "Unknown distributedlog scheme found : " + uri);
         URI normalizedUri;
         try {
             normalizedUri = new URI(
-                    schemeParts[0],     // remove backend info
+                    serviceURI.getServiceName(),     // remove backend info
                     uri.getAuthority(),
                     uri.getPath(),
                     uri.getQuery(),
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
index 7ee7cb3b3..b905c41af 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
@@ -28,9 +28,6 @@
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.junit.Test;
 
-
-
-
 /**
  * Test Namespace Builder.
  */
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index ff0cda249..e5a6df43a 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -29,11 +29,13 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.exceptions.NamespaceExistsException;
+import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -226,12 +228,18 @@ private void startServers() throws Exception {
         executor.shutdown();
     }
 
+
     private void createDefaultNamespaces() throws Exception {
+        String serviceUri = String.format(
+            "bk://%s/",
+            getRpcEndpoints().stream()
+                .map(endpoint -> NetUtils.endpointToString(endpoint))
+                .collect(Collectors.joining(",")));
         StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(getRpcEndpoints().toArray(new Endpoint[getRpcEndpoints().size()]))
+            .serviceUri(serviceUri)
             .usePlaintext(true)
             .build();
-        log.info("RpcEndpoints are : {}", settings.endpoints());
+        log.info("Service uri are : {}", serviceUri);
         String namespaceName = "default";
         try (StorageAdminClient admin = StorageClientBuilder.newBuilder()
             .withSettings(settings)
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
index fe7c91431..a7d8549b5 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -78,8 +78,13 @@ protected static int getNumBookies() {
     //
 
     protected static StorageClientSettings newStorageClientSettings() {
+        String serviceUri = String.format(
+            "bk://%s/",
+            getExsternalStreamEndpoints().stream()
+                .map(endpoint -> NetUtils.endpointToString(endpoint))
+                .collect(Collectors.joining(",")));
         return StorageClientSettings.newBuilder()
-            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
+            .serviceUri(serviceUri)
             .endpointResolver(endpoint -> {
                 String internalEndpointStr = NetUtils.endpointToString(endpoint);
                 String externalEndpointStr =


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services