You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/25 08:02:16 UTC

[GitHub] sijie closed pull request #1430: [table service] client interceptor and storage container grpc proxy

sijie closed pull request #1430: [table service] client interceptor and storage container grpc proxy
URL: https://github.com/apache/bookkeeper/pull/1430
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 34dc95790..7e1e02253 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -19,12 +19,15 @@
 package org.apache.bookkeeper.clients.impl.channel;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import java.util.Optional;
 import java.util.function.Function;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
 import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
@@ -53,7 +56,7 @@
     }
 
     private final Optional<String> token;
-    private final ManagedChannel channel;
+    private final Channel channel;
 
     @GuardedBy("this")
     private RootRangeServiceFutureStub rootRangeService;
@@ -87,6 +90,11 @@ public StorageServerChannel(Endpoint endpoint,
     @VisibleForTesting
     public StorageServerChannel(ManagedChannel channel,
                                 Optional<String> token) {
+        this((Channel) channel, token);
+    }
+
+    protected StorageServerChannel(Channel channel,
+                                   Optional<String> token) {
         this.token = token;
         this.channel = channel;
     }
@@ -127,8 +135,26 @@ public synchronized TableServiceFutureStub getTableService() {
         return kvService;
     }
 
+    /**
+     * Create an intercepted server channel that add additional storage container metadata.
+     *
+     * @param scId storage container id
+     * @return an intercepted server channel.
+     */
+    public StorageServerChannel intercept(long scId) {
+        Channel interceptedChannel = ClientInterceptors.intercept(
+            this.channel,
+            new StorageContainerClientInterceptor(scId));
+
+        return new StorageServerChannel(
+            interceptedChannel,
+            this.token);
+    }
+
     @Override
     public void close() {
-        channel.shutdown();
+        if (channel instanceof ManagedChannel) {
+            ((ManagedChannel) channel).shutdown();
+        }
     }
 }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
index 8635e0fd4..a20d7c9bd 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
@@ -190,9 +190,13 @@ private void handleFetchStorageContainerInfoSuccess(
             }
             return;
         }
+
+        // intercept the storage server channel with additional sc metadata
+        StorageServerChannel interceptedChannel = serverChannel.intercept(scId);
+
         // update the future
         synchronized (this) {
-            rsChannelFuture.complete(serverChannel);
+            rsChannelFuture.complete(interceptedChannel);
         }
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
new file mode 100644
index 000000000..284431e99
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.container;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
+
+/**
+ * A client interceptor that intercepting outgoing calls to storage containers.
+ */
+public class StorageContainerClientInterceptor implements ClientInterceptor {
+
+    private static final String SC_ID_KEY = "SC_ID";
+
+    private final long scId;
+    private final Metadata.Key<Long> scIdKey;
+
+    public StorageContainerClientInterceptor(long scId) {
+        this.scId = scId;
+        this.scIdKey = Metadata.Key.of(
+            SC_ID_KEY,
+            LongBinaryMarshaller.of());
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+                                                               CallOptions callOptions,
+                                                               Channel next) {
+        return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+            @Override
+            protected void checkedStart(Listener<RespT> responseListener,
+                                        Metadata headers) throws Exception {
+                headers.put(scIdKey, scId);
+                delegate().start(responseListener, headers);
+            }
+        };
+    }
+}
diff --git a/stream/common/pom.xml b/stream/common/pom.xml
index e4052a5ec..a26aed640 100644
--- a/stream/common/pom.xml
+++ b/stream/common/pom.xml
@@ -41,5 +41,11 @@
       <groupId>io.grpc</groupId>
       <artifactId>grpc-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.tests</groupId>
+      <artifactId>stream-storage-tests-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java
new file mode 100644
index 000000000..8b1c93932
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.Metadata.BinaryMarshaller;
+
+/**
+ * Marshaller for byte array.
+ */
+public class IdentityBinaryMarshaller implements BinaryMarshaller<byte[]> {
+
+    public static IdentityBinaryMarshaller of() {
+        return INSTANCE;
+    }
+
+    private static final IdentityBinaryMarshaller INSTANCE = new IdentityBinaryMarshaller();
+
+    private IdentityBinaryMarshaller() {}
+
+    @Override
+    public byte[] toBytes(byte[] value) {
+        return value;
+    }
+
+    @Override
+    public byte[] parseBytes(byte[] serialized) {
+        return serialized;
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java
new file mode 100644
index 000000000..fd54b5eda
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.MethodDescriptor;
+import java.io.InputStream;
+
+/**
+ * An identity marshaller.
+ */
+public class IdentityInputStreamMarshaller implements MethodDescriptor.Marshaller<InputStream>  {
+
+    public static IdentityInputStreamMarshaller of() {
+        return INSTANCE;
+    }
+
+    private static final IdentityInputStreamMarshaller INSTANCE = new IdentityInputStreamMarshaller();
+
+    private IdentityInputStreamMarshaller() {}
+
+    @Override
+    public InputStream stream(InputStream value) {
+        return value;
+    }
+
+    @Override
+    public InputStream parse(InputStream stream) {
+        return stream;
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java
new file mode 100644
index 000000000..92194f14e
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.Metadata.BinaryMarshaller;
+import org.apache.bookkeeper.common.util.Bytes;
+
+/**
+ * Marshaller for long numbers.
+ */
+public class LongBinaryMarshaller implements BinaryMarshaller<Long> {
+
+    public static LongBinaryMarshaller of() {
+        return INSTANCE;
+    }
+
+    private static final LongBinaryMarshaller INSTANCE = new LongBinaryMarshaller();
+
+    private LongBinaryMarshaller() {}
+
+    @Override
+    public byte[] toBytes(Long value) {
+        return Bytes.toBytes(value);
+    }
+
+    @Override
+    public Long parseBytes(byte[] serialized) {
+        return Bytes.toLong(serialized, 0);
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java
new file mode 100644
index 000000000..3e69f6898
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common Grpc Netty Utils.
+ */
+package org.apache.bookkeeper.common.grpc.netty;
\ No newline at end of file
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java
new file mode 100644
index 000000000..d943bdf83
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common Grpc Utils.
+ */
+package org.apache.bookkeeper.common.grpc;
\ No newline at end of file
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java
new file mode 100644
index 000000000..698c380ac
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.Channel;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+
+/**
+ * Find a grpc {@link io.grpc.Channel} to route the requests.
+ */
+public interface ChannelFinder {
+
+    /**
+     * Find a channel to route the server call.
+     *
+     * @param serverCall server call
+     * @param headers request metadata
+     * @return channel to route the server call.
+     */
+    Channel findChannel(ServerCall<?, ?> serverCall,
+                        Metadata headers);
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java
new file mode 100644
index 000000000..35f6d2d8b
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import lombok.Getter;
+
+/**
+ * Proxy grpc calls.
+ */
+@Getter
+class ProxyCall<ReqT, RespT> {
+
+    private final RequestProxy serverCallListener;
+    private final ResponseProxy clientCallListener;
+
+    ProxyCall(ServerCall<ReqT, RespT> serverCall,
+              ClientCall<ReqT, RespT> clientCall) {
+        this.serverCallListener = new RequestProxy(clientCall);
+        this.clientCallListener = new ResponseProxy(serverCall);
+    }
+
+    /**
+     * Request proxy to delegate client call.
+     */
+    private class RequestProxy extends ServerCall.Listener<ReqT> {
+
+        private final ClientCall<ReqT, ?> clientCall;
+        private boolean needToRequest;
+
+        public RequestProxy(ClientCall<ReqT, ?> clientCall) {
+            this.clientCall = clientCall;
+        }
+
+        @Override
+        public void onMessage(ReqT message) {
+            clientCall.sendMessage(message);
+            synchronized (this) {
+                if (clientCall.isReady()) {
+                    clientCallListener.serverCall.request(1);
+                } else {
+                    needToRequest = true;
+                }
+            }
+        }
+
+        @Override
+        public void onHalfClose() {
+            clientCall.halfClose();
+        }
+
+        @Override
+        public void onCancel() {
+            clientCall.cancel("Server cancelled", null);
+        }
+
+        @Override
+        public void onReady() {
+            clientCallListener.onServerReady();
+        }
+
+        synchronized void onClientReady() {
+            if (needToRequest) {
+                clientCallListener.serverCall.request(1);
+                needToRequest = false;
+            }
+        }
+
+    }
+
+    /**
+     * Response proxy to delegate server call.
+     */
+    private class ResponseProxy extends ClientCall.Listener<RespT> {
+
+        private final ServerCall<?, RespT> serverCall;
+        private boolean needToRequest;
+
+        public ResponseProxy(ServerCall<?, RespT> serverCall) {
+            this.serverCall = serverCall;
+        }
+
+        @Override
+        public void onHeaders(Metadata headers) {
+            serverCall.sendHeaders(headers);
+        }
+
+        @Override
+        public void onMessage(RespT message) {
+            serverCall.sendMessage(message);
+            synchronized (this) {
+                if (serverCall.isReady()) {
+                    serverCallListener.clientCall.request(1);
+                } else {
+                    needToRequest = true;
+                }
+            }
+        }
+
+        @Override
+        public void onClose(Status status, Metadata trailers) {
+            serverCall.close(status, trailers);
+        }
+
+        @Override
+        public void onReady() {
+            serverCallListener.onClientReady();
+        }
+
+        synchronized void onServerReady() {
+            if (needToRequest) {
+                serverCallListener.clientCall.request(1);
+                needToRequest = false;
+            }
+        }
+
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java
new file mode 100644
index 000000000..94cebb1c4
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.grpc.CallOptions;
+import io.grpc.HandlerRegistry;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerMethodDefinition;
+import io.grpc.ServerServiceDefinition;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.common.grpc.netty.IdentityInputStreamMarshaller;
+
+/**
+ * Registry for proxying grpc services.
+ */
+public class ProxyHandlerRegistry extends HandlerRegistry {
+
+    private final Map<String, ServerMethodDefinition<?, ?>> methods;
+
+    private ProxyHandlerRegistry(Map<String, ServerMethodDefinition<?, ?>> methods) {
+        this.methods = methods;
+    }
+
+    @Nullable
+    @Override
+    public ServerMethodDefinition<?, ?> lookupMethod(String methodName,
+                                                     @Nullable String authority) {
+        return methods.get(methodName);
+    }
+
+    private static ServerMethodDefinition<?, ?> createProxyServerMethodDefinition(
+            MethodDescriptor<?, ?> methodDesc,
+            ServerCallHandler<InputStream, InputStream> handler) {
+        MethodDescriptor<InputStream, InputStream> methodDescriptor = MethodDescriptor.newBuilder(
+            IdentityInputStreamMarshaller.of(), IdentityInputStreamMarshaller.of())
+            .setFullMethodName(methodDesc.getFullMethodName())
+            .setType(methodDesc.getType())
+            .setIdempotent(methodDesc.isIdempotent())
+            .setSafe(methodDesc.isSafe())
+            .build();
+        return ServerMethodDefinition.create(methodDescriptor, handler);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder to build the handler registry.
+     */
+    public static class Builder {
+
+        // store per-service first, to make sure services are added/replaced atomically.
+        private final HashMap<String, ServerServiceDefinition> services =
+            new LinkedHashMap<>();
+        private ChannelFinder finder;
+
+        /**
+         * Add the service to this grpc handler registry.
+         *
+         * @param service grpc service definition
+         * @return registry builder
+         */
+        public Builder addService(ServerServiceDefinition service) {
+            services.put(
+                service.getServiceDescriptor().getName(),
+                service);
+            return this;
+        }
+
+        /**
+         * Registered a channel finder for proxying server calls.
+         *
+         * @param finder channel finder
+         * @return registry builder
+         */
+        public Builder setChannelFinder(ChannelFinder finder) {
+            this.finder = finder;
+            return this;
+        }
+
+        /**
+         * Build the proxy handler registry.
+         *
+         * @return registry builder
+         */
+        public ProxyHandlerRegistry build() {
+            checkNotNull(finder, "No channel finder defined");
+
+            ProxyServerCallHandler<InputStream, InputStream> proxyHandler =
+                new ProxyServerCallHandler<>(finder, CallOptions.DEFAULT);
+
+            Map<String, ServerMethodDefinition<?, ?>> methods = new HashMap<>();
+            for (ServerServiceDefinition service : services.values()) {
+                for (ServerMethodDefinition<?, ?> method : service.getMethods()) {
+                    String methodName = method.getMethodDescriptor().getFullMethodName();
+                    methods.put(
+                        methodName,
+                        createProxyServerMethodDefinition(
+                            method.getMethodDescriptor(),
+                            proxyHandler)
+                    );
+                }
+            }
+            return new ProxyHandlerRegistry(
+                Collections.unmodifiableMap(methods));
+        }
+
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
new file mode 100644
index 000000000..a6914979a
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+
+/**
+ * Abstract server call handler
+ */
+class ProxyServerCallHandler<ReqT, RespT> implements ServerCallHandler<ReqT, RespT> {
+
+    private final ChannelFinder finder;
+    private final CallOptions callOptions;
+
+    ProxyServerCallHandler(ChannelFinder finder,
+                           CallOptions callOptions) {
+        this.finder = finder;
+        this.callOptions = callOptions;
+    }
+
+    @Override
+    public Listener<ReqT> startCall(ServerCall<ReqT, RespT> serverCall, Metadata headers) {
+        Channel channel = finder.findChannel(serverCall, headers);
+        ClientCall<ReqT, RespT> clientCall = channel.newCall(
+            serverCall.getMethodDescriptor(), callOptions);
+        ProxyCall<ReqT, RespT> proxyCall = new ProxyCall<>(
+            serverCall,
+            clientCall);
+        clientCall.start(proxyCall.getClientCallListener(), headers);
+        serverCall.request(1);
+        clientCall.request(1);
+        return proxyCall.getServerCallListener();
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java
new file mode 100644
index 000000000..7e023f518
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Grpc Utils for proxying grpc requests.
+ */
+package org.apache.bookkeeper.common.grpc.proxy;
\ No newline at end of file
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java
new file mode 100644
index 000000000..730cc6f0d
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertSame;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.Test;
+
+/**
+ * Unit test {@link IdentityBinaryMarshaller}.
+ */
+public class IdentityBinaryMarshallerTest {
+
+    @Test
+    public void testParseAndToBytes() {
+        byte[] data = new byte[32];
+        ThreadLocalRandom.current().nextBytes(data);
+        byte[] dataCopy = new byte[data.length];
+        System.arraycopy(data, 0, dataCopy, 0, data.length);
+
+        byte[] serializedData = IdentityBinaryMarshaller.of().toBytes(data);
+        // identity binary marshaller should return same object
+        assertSame(data, serializedData);
+        // identity binary marshaller should return same content
+        assertArrayEquals(dataCopy, serializedData);
+
+        byte[] deserializedData = IdentityBinaryMarshaller.of().parseBytes(data);
+        // identity binary marshaller should return same object
+        assertSame(data, deserializedData);
+        // identity binary marshaller should return same content
+        assertArrayEquals(dataCopy, deserializedData);
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java
new file mode 100644
index 000000000..8cafc2f5f
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import java.io.InputStream;
+import org.junit.Test;
+
+/**
+ * Unit test {@link IdentityInputStreamMarshaller}.
+ */
+public class IdentityInputStreamMarshallerTest {
+
+    @Test
+    public void testStream() {
+        InputStream mockIs = mock(InputStream.class);
+        assertSame(mockIs, IdentityInputStreamMarshaller.of().stream(mockIs));
+    }
+
+    @Test
+    public void testParse() {
+        InputStream mockIs = mock(InputStream.class);
+        assertSame(mockIs, IdentityInputStreamMarshaller.of().parse(mockIs));
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java
new file mode 100644
index 000000000..e4dccb54e
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.common.util.Bytes;
+import org.junit.Test;
+
+/**
+ * Unit test {@link LongBinaryMarshaller}.
+ */
+public class LongBinaryMarshallerTest {
+
+    @Test
+    public void testParseAndToBytes() {
+        long value = System.currentTimeMillis();
+        byte[] valueBytes = LongBinaryMarshaller.of().toBytes(value);
+        assertArrayEquals(Bytes.toBytes(value), valueBytes);
+        long parsedValue = LongBinaryMarshaller.of().parseBytes(valueBytes);
+        assertEquals(value, parsedValue);
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java
new file mode 100644
index 000000000..4611efea4
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+/**
+ * Test PingPongService by directly accessing the grpc service.
+ *
+ * <p>This is to ensure the tests in {@link PingPongServiceTestBase} are correct to be used for testing
+ * reverse proxy in {@link ProxyPingPongServiceTest}.
+ */
+public class DirectPingPongServiceTest extends PingPongServiceTestBase {
+    public DirectPingPongServiceTest() {
+        super(false);
+    }
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java
new file mode 100644
index 000000000..ce3dcfd75
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import lombok.Data;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.ExceptionalFunction;
+import org.apache.bookkeeper.tests.rpc.PingPongService;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceBlockingStub;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceStub;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test grpc reverse proxy using {@link org.apache.bookkeeper.tests.rpc.PingPongService}.
+ */
+public abstract class PingPongServiceTestBase {
+
+    private static final int NUM_PONGS_PER_PING = 10;
+    private static final String SERVICE_NAME = "pingpong";
+
+    private final boolean useReverseProxy;
+
+    protected Server realServer;
+    protected Server proxyServer;
+    protected PingPongService service;
+    protected ManagedChannel proxyChannel;
+    protected ManagedChannel clientChannel;
+    protected PingPongServiceStub client;
+
+    PingPongServiceTestBase(boolean useReverseProxy) {
+        this.useReverseProxy = useReverseProxy;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        service = new PingPongService(NUM_PONGS_PER_PING);
+        ServerServiceDefinition pingPongServiceDef = service.bindService();
+
+        String serverName;
+        if (useReverseProxy) {
+            serverName = "proxy-" + SERVICE_NAME;
+        } else {
+            serverName = SERVICE_NAME;
+        }
+        // build a real server
+        MutableHandlerRegistry realRegistry = new MutableHandlerRegistry();
+        realServer = InProcessServerBuilder
+            .forName(serverName)
+            .fallbackHandlerRegistry(realRegistry)
+            .directExecutor()
+            .build()
+            .start();
+        realRegistry.addService(pingPongServiceDef);
+
+        if (useReverseProxy) {
+            proxyChannel = InProcessChannelBuilder.forName(serverName)
+                .usePlaintext(false)
+                .build();
+
+            ProxyHandlerRegistry registry = ProxyHandlerRegistry.newBuilder()
+                .addService(pingPongServiceDef)
+                .setChannelFinder((serverCall, header) -> proxyChannel)
+                .build();
+            proxyServer = InProcessServerBuilder
+                .forName(SERVICE_NAME)
+                .fallbackHandlerRegistry(registry)
+                .directExecutor()
+                .build()
+                .start();
+        } else {
+            proxyServer = realServer;
+        }
+
+        clientChannel = InProcessChannelBuilder.forName(SERVICE_NAME)
+            .usePlaintext(false)
+            .build();
+
+        client = PingPongServiceGrpc.newStub(clientChannel);
+
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != clientChannel) {
+            clientChannel.shutdown();
+        }
+
+        if (null != proxyServer) {
+            proxyServer.shutdown();
+        }
+
+        if (null != proxyChannel) {
+            proxyChannel.shutdown();
+        }
+
+        if (null != realServer && proxyServer != realServer) {
+            realServer.shutdown();
+        }
+    }
+
+
+    @Test
+    public void testUnary() {
+        PingPongServiceBlockingStub clientBlocking = PingPongServiceGrpc.newBlockingStub(clientChannel);
+
+        long sequence = ThreadLocalRandom.current().nextLong();
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        PongResponse response = clientBlocking.pingPong(request);
+        assertEquals(sequence, response.getLastSequence());
+        assertEquals(1, response.getNumPingReceived());
+        assertEquals(0, response.getSlotId());
+    }
+
+    @Test
+    public void testServerStreaming() {
+        PingPongServiceBlockingStub clientBlocking = PingPongServiceGrpc.newBlockingStub(clientChannel);
+
+        long sequence = ThreadLocalRandom.current().nextLong(100000);
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        Iterator<PongResponse> respIter = clientBlocking.lotsOfPongs(request);
+        int count = 0;
+        while (respIter.hasNext()) {
+            PongResponse resp = respIter.next();
+            assertEquals(sequence, resp.getLastSequence());
+            assertEquals(1, resp.getNumPingReceived());
+            assertEquals(count, resp.getSlotId());
+            ++count;
+        }
+    }
+
+    @Test
+    public void testClientStreaming() throws Exception {
+        final int numPings = 100;
+        final long sequence = ThreadLocalRandom.current().nextLong(100000);
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = client.lotsOfPings(new StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        for (int i = 0; i < numPings; i++) {
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence + i)
+                .build();
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received.
+        result(respFuture);
+
+        assertEquals(1, respQueue.size());
+
+        PongResponse resp = respQueue.take();
+        assertEquals(sequence + numPings - 1, resp.getLastSequence());
+        assertEquals(numPings, resp.getNumPingReceived());
+        assertEquals(0, resp.getSlotId());
+    }
+
+    @Test
+    public void testBidiStreaming() throws Exception {
+        final int numPings = 100;
+
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = client.bidiPingPong(new StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        final LinkedBlockingQueue<PingRequest> reqQueue = new LinkedBlockingQueue<>();
+        for (int i = 0; i < numPings; i++) {
+            final long sequence = ThreadLocalRandom.current().nextLong(100000);
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence)
+                .build();
+            reqQueue.put(request);
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received
+        result(respFuture);
+
+        assertEquals(numPings, respQueue.size());
+
+        int count = 0;
+        for (PingRequest request : reqQueue) {
+            PongResponse response = respQueue.take();
+
+            assertEquals(request.getSequence(), response.getLastSequence());
+            assertEquals(++count, response.getNumPingReceived());
+            assertEquals(0, response.getSlotId());
+        }
+        assertNull(respQueue.poll());
+        assertEquals(numPings, count);
+    }
+
+    @Data
+    static class Runner implements Runnable {
+
+        private final CountDownLatch startLatch;
+        private final CountDownLatch doneLatch;
+        private final AtomicReference<Exception> exceptionHolder;
+        private final ExceptionalFunction<Void, Void> func;
+
+        @Override
+        public void run() {
+            try {
+                startLatch.await();
+            } catch (InterruptedException e) {
+            }
+            int numIters = ThreadLocalRandom.current().nextInt(10, 100);
+            IntStream.of(numIters).forEach(idx -> {
+                if (null != exceptionHolder.get()) {
+                    // break if exception occurs
+                    return;
+                }
+                try {
+                    func.apply(null);
+                } catch (Exception e) {
+                    exceptionHolder.set(e);
+                    doneLatch.countDown();
+                }
+            });
+            if (null == exceptionHolder.get()) {
+                doneLatch.countDown();
+            }
+        }
+    }
+
+    @Test
+    public void testMixed() throws Exception {
+        int numTypes = 4;
+
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numTypes);
+        final AtomicReference<Exception> exception = new AtomicReference<>();
+
+        ExecutorService executor = Executors.newFixedThreadPool(numTypes);
+        // start unmary test
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testUnary();
+            return null;
+        }));
+
+        // start client streaming tests
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testClientStreaming();
+            return null;
+        }));
+
+        // start server streaming tests
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testServerStreaming();
+            return null;
+        }));
+
+        // start bidi streaming tests
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testBidiStreaming();
+            return null;
+        }));
+
+        // start the tests
+        startLatch.countDown();
+
+        // wait for tests to complete
+        doneLatch.await();
+
+        // make sure all succeed
+        assertNull("Exception found : " + exception.get(), exception.get());
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java
new file mode 100644
index 000000000..a099717cf
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+/**
+ * Test reverse grpc proxy using ping pong service.
+ */
+public class ProxyPingPongServiceTest extends PingPongServiceTestBase {
+
+    public ProxyPingPongServiceTest() {
+        super(true);
+    }
+}
diff --git a/stream/pom.xml b/stream/pom.xml
index f110d5201..979f877d1 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -31,6 +31,7 @@
 
   <modules>
     <module>common</module>
+    <module>tests-common</module>
     <module>statelib</module>
     <module>api</module>
     <module>proto</module>
diff --git a/stream/proto/pom.xml b/stream/proto/pom.xml
index de140d928..0d8f690e2 100644
--- a/stream/proto/pom.xml
+++ b/stream/proto/pom.xml
@@ -42,6 +42,12 @@
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.tests</groupId>
+      <artifactId>stream-storage-tests-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
index cf9b91706..3eb440a1a 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/stream/tests-common/pom.xml b/stream/tests-common/pom.xml
new file mode 100644
index 000000000..0f6cc105e
--- /dev/null
+++ b/stream/tests-common/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bookkeeper</groupId>
+    <artifactId>stream-storage-parent</artifactId>
+    <version>4.8.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <groupId>org.apache.bookkeeper.tests</groupId>
+  <artifactId>stream-storage-tests-common</artifactId>
+  <name>Apache BookKeeper :: Stream Storage :: Common Classes for Tests</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <extensions>
+      <extension>
+        <groupId>kr.motd.maven</groupId>
+        <artifactId>os-maven-plugin</artifactId>
+        <version>${os-maven-plugin.version}</version>
+      </extension>
+    </extensions>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>${maven-compiler-plugin.version}</version>
+        <configuration>
+          <source>${javac.target}</source>
+          <target>${javac.target}</target>
+          <compilerArgs>
+            <compilerArg>-Xlint:unchecked</compilerArg>
+            <!-- https://issues.apache.org/jira/browse/MCOMPILER-205 -->
+            <compilerArg>-Xpkginfo:always</compilerArg>
+          </compilerArgs>
+          <showDeprecation>false</showDeprecation>
+          <showWarnings>false</showWarnings>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.xolstice.maven.plugins</groupId>
+        <artifactId>protobuf-maven-plugin</artifactId>
+        <version>${protobuf-maven-plugin.version}</version>
+        <configuration>
+          <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+          <pluginId>grpc-java</pluginId>
+          <pluginArtifact>io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.version}:exe:${os.detected.classifier}</pluginArtifact>
+          <checkStaleness>true</checkStaleness>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>compile-custom</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>${maven-jar-plugin.version}</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java
new file mode 100644
index 000000000..4a947762d
--- /dev/null
+++ b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.rpc;
+
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceImplBase;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+
+/**
+ * An implementation of the ping pong service used for testing.
+ */
+@Slf4j
+public class PingPongService extends PingPongServiceImplBase {
+
+    private final int streamPongSize;
+
+    public PingPongService(int streamPongSize) {
+        this.streamPongSize = streamPongSize;
+    }
+
+    @Override
+    public void pingPong(PingRequest request, StreamObserver<PongResponse> responseObserver) {
+        responseObserver.onNext(PongResponse.newBuilder()
+            .setLastSequence(request.getSequence())
+            .setNumPingReceived(1)
+            .setSlotId(0)
+            .build());
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public StreamObserver<PingRequest> lotsOfPings(StreamObserver<PongResponse> responseObserver) {
+        return new StreamObserver<PingRequest>() {
+
+            int pingCount = 0;
+            long lastSequence = -1L;
+
+            @Override
+            public void onNext(PingRequest value) {
+                pingCount++;
+                lastSequence = value.getSequence();
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                log.error("Failed on receiving stream of pings", t);
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onNext(PongResponse.newBuilder()
+                    .setNumPingReceived(pingCount)
+                    .setLastSequence(lastSequence)
+                    .setSlotId(0)
+                    .build());
+                responseObserver.onCompleted();
+            }
+        };
+    }
+
+    @Override
+    public void lotsOfPongs(PingRequest request, StreamObserver<PongResponse> responseObserver) {
+        long sequence = request.getSequence();
+        for (int i = 0; i < streamPongSize; i++) {
+            responseObserver.onNext(PongResponse.newBuilder()
+                .setLastSequence(sequence)
+                .setNumPingReceived(1)
+                .setSlotId(i)
+                .build());
+        }
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public StreamObserver<PingRequest> bidiPingPong(StreamObserver<PongResponse> responseObserver) {
+        return new StreamObserver<PingRequest>() {
+
+            int pingCount = 0;
+
+            @Override
+            public void onNext(PingRequest ping) {
+                pingCount++;
+                responseObserver.onNext(PongResponse.newBuilder()
+                    .setLastSequence(ping.getSequence())
+                    .setNumPingReceived(pingCount)
+                    .setSlotId(0)
+                    .build());
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                responseObserver.onError(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onCompleted();
+            }
+        };
+    }
+}
diff --git a/stream/proto/src/main/proto/proto2_coder_test_messages.proto b/stream/tests-common/src/main/proto/proto2_coder_test_messages.proto
similarity index 100%
rename from stream/proto/src/main/proto/proto2_coder_test_messages.proto
rename to stream/tests-common/src/main/proto/proto2_coder_test_messages.proto
diff --git a/stream/tests-common/src/main/proto/rpc.proto b/stream/tests-common/src/main/proto/rpc.proto
new file mode 100644
index 000000000..8bda3690e
--- /dev/null
+++ b/stream/tests-common/src/main/proto/rpc.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package bookkeeper.tests.proto.rpc;
+
+option java_multiple_files = true;
+option java_package = "org.bookkeeper.tests.proto.rpc";
+
+message PingRequest {
+    int64 sequence = 1;
+}
+
+message PongResponse {
+    int64 last_sequence = 1;
+    int32 num_ping_received = 2;
+    // the slot id in this stream of pong responses.
+    int32 slot_id = 3;
+}
+
+service PingPongService {
+
+    rpc PingPong(PingRequest) returns (PongResponse) {}
+
+    rpc LotsOfPings(stream PingRequest) returns (PongResponse) {}
+
+    rpc LotsOfPongs(PingRequest) returns (stream PongResponse) {}
+
+    rpc BidiPingPong(stream PingRequest) returns (stream PongResponse) {}
+
+}
+
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services