You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/12/01 00:29:04 UTC

[GitHub] sijie closed pull request #1842: [table service] add a bookie registration based grpc name resolver

sijie closed pull request #1842:  [table service] add a bookie registration based grpc name resolver
URL: https://github.com/apache/bookkeeper/pull/1842
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/stream/bk-grpc-name-resolver/pom.xml b/stream/bk-grpc-name-resolver/pom.xml
new file mode 100644
index 0000000000..312adfbf1d
--- /dev/null
+++ b/stream/bk-grpc-name-resolver/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>stream-storage-parent</artifactId>
+    <groupId>org.apache.bookkeeper</groupId>
+    <version>4.9.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <groupId>org.apache.bookkeeper</groupId>
+  <artifactId>bk-grpc-name-resolver</artifactId>
+  <name>Apache BookKeeper :: Stream Storage :: Common :: BK Grpc Name Resolver</name>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-java-client-base</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolver.java b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolver.java
new file mode 100644
index 0000000000..cc25978079
--- /dev/null
+++ b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolver.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.grpc.resolver;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.NameResolver;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+
+/**
+ * A {@link NameResolver} implementation based on bookkeeper {@link org.apache.bookkeeper.discover.RegistrationClient}.
+ */
+class BKRegistrationNameResolver extends NameResolver {
+
+    private final MetadataClientDriver clientDriver;
+    private final URI serviceURI;
+    private final ScheduledExecutorService executor;
+
+    private Listener listener;
+    private boolean shutdown;
+    private boolean resolving;
+
+    BKRegistrationNameResolver(MetadataClientDriver clientDriver,
+                               URI serviceURI) {
+        this.clientDriver = clientDriver;
+        this.serviceURI = serviceURI;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("registration-name-resolver").build());
+    }
+
+    @Override
+    public String getServiceAuthority() {
+        return serviceURI.getAuthority();
+    }
+
+    @Override
+    public synchronized void start(Listener listener) {
+        checkState(null == this.listener, "Resolver already started");
+        this.listener = Objects.requireNonNull(listener, "Listener is null");
+
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(serviceURI.toString());
+
+        try {
+            clientDriver.initialize(conf, executor, NullStatsLogger.INSTANCE, Optional.empty());
+        } catch (MetadataException e) {
+            throw new RuntimeException("Failed to initialize registration client driver at " + serviceURI, e);
+        }
+
+        resolve();
+    }
+
+    private synchronized void resolve() {
+        if (resolving || shutdown) {
+            return;
+        }
+        resolving = true;
+        this.clientDriver.getRegistrationClient().watchWritableBookies(bookies -> {
+            Listener savedListener;
+            synchronized (this) {
+                savedListener = listener;
+            }
+            savedListener.onAddresses(
+                hostsToEquivalentAddressGroups(bookies.getValue()),
+                Attributes.EMPTY
+            );
+        }).whenComplete((ignored, cause) -> {
+            try {
+                if (null != cause) {
+                    resolve();
+                }
+            } finally {
+                synchronized (this) {
+                    resolving = false;
+                }
+            }
+        });
+    }
+
+    private static List<EquivalentAddressGroup> hostsToEquivalentAddressGroups(Set<BookieSocketAddress> bookies) {
+        return bookies.stream()
+            .map(addr -> new EquivalentAddressGroup(
+                Collections.singletonList(addr.getSocketAddress()),
+                Attributes.EMPTY
+            ))
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public void shutdown() {
+        synchronized (this) {
+            if (shutdown) {
+                return;
+            }
+            shutdown = true;
+        }
+        executor.shutdown();
+        clientDriver.close();
+    }
+}
diff --git a/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverProvider.java b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverProvider.java
new file mode 100644
index 0000000000..1c61de72ae
--- /dev/null
+++ b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.grpc.resolver;
+
+import com.google.common.collect.Lists;
+import io.grpc.Attributes;
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+import java.net.URI;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.common.resolver.NameResolverFactoryProvider;
+import org.apache.bookkeeper.common.resolver.NameResolverProviderFactory;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+
+/**
+ * An implementation of {@link NameResolverProvider} that provides {@link io.grpc.NameResolver}s
+ * to resolve servers registered using bookkeeper registration library.
+ */
+@Slf4j
+public class BKRegistrationNameResolverProvider extends NameResolverFactoryProvider {
+
+    @Override
+    protected boolean isAvailable() {
+        return true;
+    }
+
+    @Override
+    protected int priority() {
+        return 100;
+    }
+
+    @Nullable
+    @Override
+    public NameResolver newNameResolver(URI targetUri, Attributes params) {
+        ServiceURI serviceURI;
+        try {
+            serviceURI = ServiceURI.create(targetUri);
+        } catch (NullPointerException | IllegalArgumentException e) {
+            // invalid uri here, so return null to allow grpc to use other name resolvers
+            log.info("BKRegistrationNameResolverProvider doesn't know how to resolve {} : cause {}",
+                targetUri, e.getMessage());
+            return null;
+        }
+
+        MetadataClientDriver clientDriver;
+        try {
+            clientDriver = MetadataDrivers.getClientDriver(serviceURI.getUri());
+            return new BKRegistrationNameResolver(clientDriver, serviceURI.getUri());
+        } catch (IllegalArgumentException iae) {
+            log.error("Unknown service uri : {}", serviceURI, iae);
+            return null;
+        }
+    }
+
+    @Override
+    public String getDefaultScheme() {
+        return ServiceURI.SERVICE_ZK;
+    }
+
+    @Override
+    public NameResolver.Factory toFactory() {
+        return new NameResolverProviderFactory(Lists.newArrayList(this));
+    }
+}
diff --git a/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/package-info.java b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/package-info.java
new file mode 100644
index 0000000000..5e82b08fa6
--- /dev/null
+++ b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A collection of grpc resolovers.
+ */
+package org.apache.bookkeeper.grpc.resolver;
\ No newline at end of file
diff --git a/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
new file mode 100644
index 0000000000..726c8b00b9
--- /dev/null
+++ b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.grpc.resolver;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.NameResolver;
+import io.grpc.NameResolver.Listener;
+import io.grpc.Status;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link BKRegistrationNameResolver}.
+ */
+public class BKRegistrationNameResolverTest extends BookKeeperClusterTestCase {
+
+    private static final String ROOT_PATH = "/resolver-test";
+    private static final String SERVERS_PATH = ROOT_PATH + "/servers";
+
+    private final BKRegistrationNameResolverProvider resolverProvider;
+
+    private MetadataBookieDriver bookieDriver;
+    private URI serviceUri;
+
+    public BKRegistrationNameResolverTest() {
+        super(0);
+        this.resolverProvider = new BKRegistrationNameResolverProvider();
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        zkc.transaction()
+            .create(ROOT_PATH, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+            .create(SERVERS_PATH, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+            .create(SERVERS_PATH + "/" + AVAILABLE_NODE, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+            .commit();
+
+        serviceUri = URI.create("zk://" + zkUtil.getZooKeeperConnectString() + SERVERS_PATH);
+
+
+        ServerConfiguration serverConf = new ServerConfiguration();
+        serverConf.setMetadataServiceUri(serviceUri.toString());
+        bookieDriver = MetadataDrivers.getBookieDriver(serviceUri);
+        bookieDriver.initialize(serverConf, () -> {}, NullStatsLogger.INSTANCE);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        bookieDriver.close();
+
+        super.tearDown();
+    }
+
+    @Test
+    public void testNameResolver() throws Exception {
+        int numServers = 3;
+
+        Set<SocketAddress> addressSet = new HashSet<>();
+        for (int i = 0; i < numServers; i++) {
+            InetSocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i);
+            addressSet.add(address);
+            bookieDriver.getRegistrationManager().registerBookie(
+                "127.0.0.1:" + (3181 + i), false
+            );
+        }
+
+        LinkedBlockingQueue<List<EquivalentAddressGroup>> notifications = new LinkedBlockingQueue<>();
+
+
+        @Cleanup("shutdown")
+        NameResolver resolver = resolverProvider.newNameResolver(serviceUri, Attributes.EMPTY);
+        resolver.start(new Listener() {
+            @Override
+            public void onAddresses(List<EquivalentAddressGroup> servers, Attributes attributes) {
+                notifications.add(servers);
+            }
+
+            @Override
+            public void onError(Status error) {
+
+            }
+        });
+
+        List<EquivalentAddressGroup> groups = notifications.take();
+        assertEquals(numServers, groups.size());
+
+        Set<SocketAddress> receivedSet = groups.stream()
+            .map(group -> group.getAddresses().get(0))
+            .collect(Collectors.toSet());
+        assertEquals(addressSet, receivedSet);
+
+        // add 3 more servers
+
+        for (int i = numServers; i < 2 * numServers; i++) {
+            InetSocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i);
+            addressSet.add(address);
+            bookieDriver.getRegistrationManager().registerBookie(
+                "127.0.0.1:" + (3181 + i), false
+            );
+        }
+
+        List<EquivalentAddressGroup> notification = notifications.take();
+        while (notification.size() < 2 * numServers) {
+            notification = notifications.take();
+        }
+        assertEquals(2 * numServers, notification.size());
+        receivedSet = notification.stream()
+            .map(group -> group.getAddresses().get(0))
+            .collect(Collectors.toSet());
+        assertEquals(addressSet, receivedSet);
+    }
+
+}
diff --git a/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/GrpcChannelsTest.java b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/GrpcChannelsTest.java
new file mode 100644
index 0000000000..9df1bf1deb
--- /dev/null
+++ b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/GrpcChannelsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.grpc.resolver;
+
+import static org.junit.Assert.assertTrue;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.utils.GrpcChannels;
+import org.junit.Test;
+
+/**
+ * Unit test {@link org.apache.bookkeeper.clients.utils.GrpcChannels} with registration based name resolver.
+ */
+public class GrpcChannelsTest {
+
+    @Test
+    public void testZKServiceUri() {
+        String serviceUri = "zk://127.0.0.1/stream/servers";
+        ManagedChannelBuilder builder = GrpcChannels.createChannelBuilder(
+            serviceUri,
+            StorageClientSettings.newBuilder().serviceUri(serviceUri).build());
+        assertTrue(builder instanceof NettyChannelBuilder);
+    }
+
+}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
index 8588359f09..f05caec673 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
@@ -23,7 +23,9 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.common.resolver.NameResolverFactoryProvider;
 import org.apache.bookkeeper.common.resolver.ServiceNameResolverProvider;
+import org.apache.bookkeeper.common.util.ReflectionUtils;
 
 /**
  * Utils to create grpc channels.
@@ -32,6 +34,8 @@
 public final class GrpcChannels {
 
     private static final String BACKEND_INPROCESS = "inprocess";
+    private static final String BK_REG_NAME_RESOLVER_PROVIDER =
+        "org.apache.bookkeeper.grpc.resolver.BKRegistrationNameResolverProvider";
 
     private GrpcChannels() {}
 
@@ -51,9 +55,22 @@ public static ManagedChannelBuilder createChannelBuilder(String serviceUri,
             // this is an inprocess service, so build an inprocess channel.
             String serviceName = uri.getServiceHosts()[0];
             builder = InProcessChannelBuilder.forName(serviceName).directExecutor();
-        } else {
+        } else if (null == uri.getServiceName() || ServiceURI.SERVICE_BK.equals(uri.getServiceName())) {
             builder = ManagedChannelBuilder.forTarget(serviceUri)
                 .nameResolverFactory(new ServiceNameResolverProvider().toFactory());
+        } else {
+            NameResolverFactoryProvider provider;
+            try {
+                provider = ReflectionUtils.newInstance(
+                    BK_REG_NAME_RESOLVER_PROVIDER,
+                    NameResolverFactoryProvider.class);
+            } catch (RuntimeException re) {
+                log.error("It seems that you don't have `bk-grpc-name-resolver` in your class path."
+                    + " Please make sure you include it as your application's dependency.");
+                throw re;
+            }
+            builder = ManagedChannelBuilder.forTarget(serviceUri)
+                .nameResolverFactory(provider.toFactory());
         }
         if (settings.usePlaintext()) {
             builder = builder.usePlaintext();
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/GrpcChannelsTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/GrpcChannelsTest.java
new file mode 100644
index 0000000000..3e3e81e98e
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/GrpcChannelsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.clients.utils;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.junit.Test;
+
+/**
+ * Unit test {@link GrpcChannels}.
+ */
+public class GrpcChannelsTest {
+
+    @Test
+    public void testInprocessServiceUri() {
+        String serviceUri = "bk+inprocess://service";
+        ManagedChannelBuilder builder = GrpcChannels.createChannelBuilder(
+            serviceUri,
+            StorageClientSettings.newBuilder().serviceUri(serviceUri).build()
+        );
+        assertTrue(builder instanceof InProcessChannelBuilder);
+    }
+
+    @Test
+    public void testBKServiceUri() {
+        String serviceUri = "bk://127.0.0.1";
+        ManagedChannelBuilder builder = GrpcChannels.createChannelBuilder(
+            serviceUri,
+            StorageClientSettings.newBuilder().serviceUri(serviceUri).build()
+        );
+        assertTrue(builder instanceof NettyChannelBuilder);
+    }
+
+    @Test
+    public void testZKServiceUri() {
+        String serviceUri = "zk://127.0.0.1/stream/servers";
+        try {
+            GrpcChannels.createChannelBuilder(
+                serviceUri,
+                StorageClientSettings.newBuilder().serviceUri(serviceUri).build()
+            );
+            fail("Should fail to create grpc channel because `bk-grpc-name-resolver` is not in the classpath");
+        } catch (RuntimeException re) {
+            assertTrue(re.getCause() instanceof ClassNotFoundException);
+        }
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverFactoryProvider.java b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverFactoryProvider.java
new file mode 100644
index 0000000000..c8edeaee4a
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverFactoryProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.resolver;
+
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+
+/**
+ * A {@link NameResolverProvider} that provides method to convert back to {@link NameResolver.Factory}.
+ */
+public abstract class NameResolverFactoryProvider extends NameResolverProvider {
+
+    /**
+     * Convert the provider to a {@link NameResolver.Factory}.
+     *
+     * @return the name resolver factory.
+     */
+    public abstract NameResolver.Factory toFactory();
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverProviderFactory.java b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverProviderFactory.java
new file mode 100644
index 0000000000..571ef6f7f9
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverProviderFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.resolver;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.grpc.Attributes;
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+import java.net.URI;
+import java.util.List;
+
+/**
+ * A {@link io.grpc.NameResolverProvider} based {@link NameResolver.Factory}.
+ */
+public class NameResolverProviderFactory extends NameResolver.Factory {
+
+    private final List<NameResolverProvider> providers;
+
+    public NameResolverProviderFactory(List<NameResolverProvider> providers) {
+        this.providers = providers;
+    }
+
+    @Override
+    public NameResolver newNameResolver(URI targetUri, Attributes params) {
+        checkForProviders();
+        for (NameResolverProvider provider : providers) {
+            NameResolver resolver = provider.newNameResolver(targetUri, params);
+            if (resolver != null) {
+                return resolver;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public String getDefaultScheme() {
+        checkForProviders();
+        return providers.get(0).getDefaultScheme();
+    }
+
+    private void checkForProviders() {
+        checkState(!providers.isEmpty(),
+            "No NameResolverProviders found. Please check your configuration");
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
index 9e11adf537..05bf7b5b6d 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
@@ -18,8 +18,6 @@
 
 package org.apache.bookkeeper.common.resolver;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import io.grpc.Attributes;
@@ -41,7 +39,7 @@
  * to resolve {@link org.apache.bookkeeper.common.net.ServiceURI}.
  */
 @Slf4j
-public final class ServiceNameResolverProvider extends NameResolverProvider {
+public final class ServiceNameResolverProvider extends NameResolverFactoryProvider {
 
     private final DnsNameResolverProvider dnsProvider;
     private final Resource<ExecutorService> executorResource;
@@ -127,38 +125,9 @@ public String getDefaultScheme() {
         return ServiceURI.SERVICE_BK;
     }
 
+    @Override
     public NameResolver.Factory toFactory() {
-        return new NameResolverFactory(Lists.newArrayList(this));
+        return new NameResolverProviderFactory(Lists.newArrayList(this));
     }
 
-    private static class NameResolverFactory extends NameResolver.Factory {
-        private final List<NameResolverProvider> providers;
-
-        public NameResolverFactory(List<NameResolverProvider> providers) {
-            this.providers = providers;
-        }
-
-        @Override
-        public NameResolver newNameResolver(URI targetUri, Attributes params) {
-            checkForProviders();
-            for (NameResolverProvider provider : providers) {
-                NameResolver resolver = provider.newNameResolver(targetUri, params);
-                if (resolver != null) {
-                    return resolver;
-                }
-            }
-            return null;
-        }
-
-        @Override
-        public String getDefaultScheme() {
-            checkForProviders();
-            return providers.get(0).getDefaultScheme();
-        }
-
-        private void checkForProviders() {
-            checkState(!providers.isEmpty(),
-                "No NameResolverProviders found. Please check your configuration");
-        }
-    }
 }
diff --git a/stream/pom.xml b/stream/pom.xml
index f4866816fb..e8a04916cb 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -38,6 +38,7 @@
     <module>clients</module>
     <module>storage</module>
     <module>server</module>
+    <module>bk-grpc-name-resolver</module>
   </modules>
 
   <build>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services