You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 21:01:30 UTC

[bookkeeper] 09/10: [TABLE SERVICE] client interceptor and storage container grpc proxy

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit c9b32d7d3cc352dd6fdd998e91ba15f206b7a568
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri May 25 01:02:06 2018 -0700

    [TABLE SERVICE] client interceptor and storage container grpc proxy
    
    Descriptions of the changes in this PR:
    
    This is a subsequent change after apache/bookkeeper#1428.
    
    *Motivation*
    
    Current almost every grpc requests are wrapped into `StorageContainerRequest` and their responses
    are wrapped into `StorageContainerResponse`. It makes things a bit complicated on adding new grpc
    services.
    
    *Changes*
    
    To simplify things, this PR introduces two functionalities for simplifying dispatching container requests/responses.
    
    1) *StorageContainerClientInterceptor*: A grpc `ClientInterceptor` that stamps container information (currently is `scId`) into the requests' metadata before sending the requests to the wire.
    
    2) A simple grpc reverse proxy to dispatch grpc requests to the channels provided by a `ChannelFinder`.
    
    *Tests*
    
    1. Existing unit tests covered client interceptor changes.
    2. Introduced a `stream-storage-tests-common` module to include common classes that would be used for testing.
    3. Introduced a `PingPongService` for testing reverse proxy : unary/client-streaming/server-streaming/bidi-streaming.
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1430 from sijie/interceptor_container_requests
---
 .../clients/impl/channel/StorageServerChannel.java |  30 +-
 .../impl/container/StorageContainerChannel.java    |   6 +-
 .../StorageContainerClientInterceptor.java         |  60 ++++
 stream/common/pom.xml                              |   6 +
 .../grpc/netty/IdentityBinaryMarshaller.java       |  45 +++
 .../grpc/netty/IdentityInputStreamMarshaller.java  |  46 +++
 .../common/grpc/netty/LongBinaryMarshaller.java    |  46 +++
 .../bookkeeper/common/grpc/netty/package-info.java |  22 ++
 .../bookkeeper/common/grpc/package-info.java       |  22 ++
 .../common/grpc/proxy/ChannelFinder.java           |  40 +++
 .../bookkeeper/common/grpc/proxy/ProxyCall.java    | 138 +++++++++
 .../common/grpc/proxy/ProxyHandlerRegistry.java    | 135 ++++++++
 .../common/grpc/proxy/ProxyServerCallHandler.java  |  56 ++++
 .../bookkeeper/common/grpc/proxy/package-info.java |  22 ++
 .../grpc/netty/IdentityBinaryMarshallerTest.java   |  52 ++++
 .../netty/IdentityInputStreamMarshallerTest.java   |  44 +++
 .../grpc/netty/LongBinaryMarshallerTest.java       |  41 +++
 .../grpc/proxy/DirectPingPongServiceTest.java      |  31 ++
 .../common/grpc/proxy/PingPongServiceTestBase.java | 345 +++++++++++++++++++++
 .../grpc/proxy/ProxyPingPongServiceTest.java       |  29 ++
 stream/pom.xml                                     |   1 +
 stream/proto/pom.xml                               |   6 +
 .../TestStorageContainerResponseHandler.java       |   2 +-
 stream/{proto => tests-common}/pom.xml             |  20 +-
 .../bookkeeper/tests/rpc/PingPongService.java      | 119 +++++++
 .../main/proto/proto2_coder_test_messages.proto    |   0
 stream/tests-common/src/main/proto/rpc.proto       |  48 +++
 27 files changed, 1396 insertions(+), 16 deletions(-)

diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 34dc957..7e1e022 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -19,12 +19,15 @@
 package org.apache.bookkeeper.clients.impl.channel;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import java.util.Optional;
 import java.util.function.Function;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
 import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
@@ -53,7 +56,7 @@ public class StorageServerChannel implements AutoCloseable {
     }
 
     private final Optional<String> token;
-    private final ManagedChannel channel;
+    private final Channel channel;
 
     @GuardedBy("this")
     private RootRangeServiceFutureStub rootRangeService;
@@ -87,6 +90,11 @@ public class StorageServerChannel implements AutoCloseable {
     @VisibleForTesting
     public StorageServerChannel(ManagedChannel channel,
                                 Optional<String> token) {
+        this((Channel) channel, token);
+    }
+
+    protected StorageServerChannel(Channel channel,
+                                   Optional<String> token) {
         this.token = token;
         this.channel = channel;
     }
@@ -127,8 +135,26 @@ public class StorageServerChannel implements AutoCloseable {
         return kvService;
     }
 
+    /**
+     * Create an intercepted server channel that add additional storage container metadata.
+     *
+     * @param scId storage container id
+     * @return an intercepted server channel.
+     */
+    public StorageServerChannel intercept(long scId) {
+        Channel interceptedChannel = ClientInterceptors.intercept(
+            this.channel,
+            new StorageContainerClientInterceptor(scId));
+
+        return new StorageServerChannel(
+            interceptedChannel,
+            this.token);
+    }
+
     @Override
     public void close() {
-        channel.shutdown();
+        if (channel instanceof ManagedChannel) {
+            ((ManagedChannel) channel).shutdown();
+        }
     }
 }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
index 8635e0f..a20d7c9 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
@@ -190,9 +190,13 @@ public class StorageContainerChannel {
             }
             return;
         }
+
+        // intercept the storage server channel with additional sc metadata
+        StorageServerChannel interceptedChannel = serverChannel.intercept(scId);
+
         // update the future
         synchronized (this) {
-            rsChannelFuture.complete(serverChannel);
+            rsChannelFuture.complete(interceptedChannel);
         }
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
new file mode 100644
index 0000000..284431e
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.container;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
+
+/**
+ * A client interceptor that intercepting outgoing calls to storage containers.
+ */
+public class StorageContainerClientInterceptor implements ClientInterceptor {
+
+    private static final String SC_ID_KEY = "SC_ID";
+
+    private final long scId;
+    private final Metadata.Key<Long> scIdKey;
+
+    public StorageContainerClientInterceptor(long scId) {
+        this.scId = scId;
+        this.scIdKey = Metadata.Key.of(
+            SC_ID_KEY,
+            LongBinaryMarshaller.of());
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+                                                               CallOptions callOptions,
+                                                               Channel next) {
+        return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+            @Override
+            protected void checkedStart(Listener<RespT> responseListener,
+                                        Metadata headers) throws Exception {
+                headers.put(scIdKey, scId);
+                delegate().start(responseListener, headers);
+            }
+        };
+    }
+}
diff --git a/stream/common/pom.xml b/stream/common/pom.xml
index 43fdc36..50e4edd 100644
--- a/stream/common/pom.xml
+++ b/stream/common/pom.xml
@@ -40,5 +40,11 @@
       <groupId>io.grpc</groupId>
       <artifactId>grpc-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.tests</groupId>
+      <artifactId>stream-storage-tests-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java
new file mode 100644
index 0000000..8b1c939
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.Metadata.BinaryMarshaller;
+
+/**
+ * Marshaller for byte array.
+ */
+public class IdentityBinaryMarshaller implements BinaryMarshaller<byte[]> {
+
+    public static IdentityBinaryMarshaller of() {
+        return INSTANCE;
+    }
+
+    private static final IdentityBinaryMarshaller INSTANCE = new IdentityBinaryMarshaller();
+
+    private IdentityBinaryMarshaller() {}
+
+    @Override
+    public byte[] toBytes(byte[] value) {
+        return value;
+    }
+
+    @Override
+    public byte[] parseBytes(byte[] serialized) {
+        return serialized;
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java
new file mode 100644
index 0000000..fd54b5e
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.MethodDescriptor;
+import java.io.InputStream;
+
+/**
+ * An identity marshaller.
+ */
+public class IdentityInputStreamMarshaller implements MethodDescriptor.Marshaller<InputStream>  {
+
+    public static IdentityInputStreamMarshaller of() {
+        return INSTANCE;
+    }
+
+    private static final IdentityInputStreamMarshaller INSTANCE = new IdentityInputStreamMarshaller();
+
+    private IdentityInputStreamMarshaller() {}
+
+    @Override
+    public InputStream stream(InputStream value) {
+        return value;
+    }
+
+    @Override
+    public InputStream parse(InputStream stream) {
+        return stream;
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java
new file mode 100644
index 0000000..92194f1
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.Metadata.BinaryMarshaller;
+import org.apache.bookkeeper.common.util.Bytes;
+
+/**
+ * Marshaller for long numbers.
+ */
+public class LongBinaryMarshaller implements BinaryMarshaller<Long> {
+
+    public static LongBinaryMarshaller of() {
+        return INSTANCE;
+    }
+
+    private static final LongBinaryMarshaller INSTANCE = new LongBinaryMarshaller();
+
+    private LongBinaryMarshaller() {}
+
+    @Override
+    public byte[] toBytes(Long value) {
+        return Bytes.toBytes(value);
+    }
+
+    @Override
+    public Long parseBytes(byte[] serialized) {
+        return Bytes.toLong(serialized, 0);
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java
new file mode 100644
index 0000000..3e69f68
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common Grpc Netty Utils.
+ */
+package org.apache.bookkeeper.common.grpc.netty;
\ No newline at end of file
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java
new file mode 100644
index 0000000..d943bdf
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common Grpc Utils.
+ */
+package org.apache.bookkeeper.common.grpc;
\ No newline at end of file
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java
new file mode 100644
index 0000000..698c380
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.Channel;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+
+/**
+ * Find a grpc {@link io.grpc.Channel} to route the requests.
+ */
+public interface ChannelFinder {
+
+    /**
+     * Find a channel to route the server call.
+     *
+     * @param serverCall server call
+     * @param headers request metadata
+     * @return channel to route the server call.
+     */
+    Channel findChannel(ServerCall<?, ?> serverCall,
+                        Metadata headers);
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java
new file mode 100644
index 0000000..35f6d2d
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import lombok.Getter;
+
+/**
+ * Proxy grpc calls.
+ */
+@Getter
+class ProxyCall<ReqT, RespT> {
+
+    private final RequestProxy serverCallListener;
+    private final ResponseProxy clientCallListener;
+
+    ProxyCall(ServerCall<ReqT, RespT> serverCall,
+              ClientCall<ReqT, RespT> clientCall) {
+        this.serverCallListener = new RequestProxy(clientCall);
+        this.clientCallListener = new ResponseProxy(serverCall);
+    }
+
+    /**
+     * Request proxy to delegate client call.
+     */
+    private class RequestProxy extends ServerCall.Listener<ReqT> {
+
+        private final ClientCall<ReqT, ?> clientCall;
+        private boolean needToRequest;
+
+        public RequestProxy(ClientCall<ReqT, ?> clientCall) {
+            this.clientCall = clientCall;
+        }
+
+        @Override
+        public void onMessage(ReqT message) {
+            clientCall.sendMessage(message);
+            synchronized (this) {
+                if (clientCall.isReady()) {
+                    clientCallListener.serverCall.request(1);
+                } else {
+                    needToRequest = true;
+                }
+            }
+        }
+
+        @Override
+        public void onHalfClose() {
+            clientCall.halfClose();
+        }
+
+        @Override
+        public void onCancel() {
+            clientCall.cancel("Server cancelled", null);
+        }
+
+        @Override
+        public void onReady() {
+            clientCallListener.onServerReady();
+        }
+
+        synchronized void onClientReady() {
+            if (needToRequest) {
+                clientCallListener.serverCall.request(1);
+                needToRequest = false;
+            }
+        }
+
+    }
+
+    /**
+     * Response proxy to delegate server call.
+     */
+    private class ResponseProxy extends ClientCall.Listener<RespT> {
+
+        private final ServerCall<?, RespT> serverCall;
+        private boolean needToRequest;
+
+        public ResponseProxy(ServerCall<?, RespT> serverCall) {
+            this.serverCall = serverCall;
+        }
+
+        @Override
+        public void onHeaders(Metadata headers) {
+            serverCall.sendHeaders(headers);
+        }
+
+        @Override
+        public void onMessage(RespT message) {
+            serverCall.sendMessage(message);
+            synchronized (this) {
+                if (serverCall.isReady()) {
+                    serverCallListener.clientCall.request(1);
+                } else {
+                    needToRequest = true;
+                }
+            }
+        }
+
+        @Override
+        public void onClose(Status status, Metadata trailers) {
+            serverCall.close(status, trailers);
+        }
+
+        @Override
+        public void onReady() {
+            serverCallListener.onClientReady();
+        }
+
+        synchronized void onServerReady() {
+            if (needToRequest) {
+                serverCallListener.clientCall.request(1);
+                needToRequest = false;
+            }
+        }
+
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java
new file mode 100644
index 0000000..94cebb1
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.grpc.CallOptions;
+import io.grpc.HandlerRegistry;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerMethodDefinition;
+import io.grpc.ServerServiceDefinition;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.common.grpc.netty.IdentityInputStreamMarshaller;
+
+/**
+ * Registry for proxying grpc services.
+ */
+public class ProxyHandlerRegistry extends HandlerRegistry {
+
+    private final Map<String, ServerMethodDefinition<?, ?>> methods;
+
+    private ProxyHandlerRegistry(Map<String, ServerMethodDefinition<?, ?>> methods) {
+        this.methods = methods;
+    }
+
+    @Nullable
+    @Override
+    public ServerMethodDefinition<?, ?> lookupMethod(String methodName,
+                                                     @Nullable String authority) {
+        return methods.get(methodName);
+    }
+
+    private static ServerMethodDefinition<?, ?> createProxyServerMethodDefinition(
+            MethodDescriptor<?, ?> methodDesc,
+            ServerCallHandler<InputStream, InputStream> handler) {
+        MethodDescriptor<InputStream, InputStream> methodDescriptor = MethodDescriptor.newBuilder(
+            IdentityInputStreamMarshaller.of(), IdentityInputStreamMarshaller.of())
+            .setFullMethodName(methodDesc.getFullMethodName())
+            .setType(methodDesc.getType())
+            .setIdempotent(methodDesc.isIdempotent())
+            .setSafe(methodDesc.isSafe())
+            .build();
+        return ServerMethodDefinition.create(methodDescriptor, handler);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder to build the handler registry.
+     */
+    public static class Builder {
+
+        // store per-service first, to make sure services are added/replaced atomically.
+        private final HashMap<String, ServerServiceDefinition> services =
+            new LinkedHashMap<>();
+        private ChannelFinder finder;
+
+        /**
+         * Add the service to this grpc handler registry.
+         *
+         * @param service grpc service definition
+         * @return registry builder
+         */
+        public Builder addService(ServerServiceDefinition service) {
+            services.put(
+                service.getServiceDescriptor().getName(),
+                service);
+            return this;
+        }
+
+        /**
+         * Registered a channel finder for proxying server calls.
+         *
+         * @param finder channel finder
+         * @return registry builder
+         */
+        public Builder setChannelFinder(ChannelFinder finder) {
+            this.finder = finder;
+            return this;
+        }
+
+        /**
+         * Build the proxy handler registry.
+         *
+         * @return registry builder
+         */
+        public ProxyHandlerRegistry build() {
+            checkNotNull(finder, "No channel finder defined");
+
+            ProxyServerCallHandler<InputStream, InputStream> proxyHandler =
+                new ProxyServerCallHandler<>(finder, CallOptions.DEFAULT);
+
+            Map<String, ServerMethodDefinition<?, ?>> methods = new HashMap<>();
+            for (ServerServiceDefinition service : services.values()) {
+                for (ServerMethodDefinition<?, ?> method : service.getMethods()) {
+                    String methodName = method.getMethodDescriptor().getFullMethodName();
+                    methods.put(
+                        methodName,
+                        createProxyServerMethodDefinition(
+                            method.getMethodDescriptor(),
+                            proxyHandler)
+                    );
+                }
+            }
+            return new ProxyHandlerRegistry(
+                Collections.unmodifiableMap(methods));
+        }
+
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
new file mode 100644
index 0000000..a691497
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+
+/**
+ * Abstract server call handler
+ */
+class ProxyServerCallHandler<ReqT, RespT> implements ServerCallHandler<ReqT, RespT> {
+
+    private final ChannelFinder finder;
+    private final CallOptions callOptions;
+
+    ProxyServerCallHandler(ChannelFinder finder,
+                           CallOptions callOptions) {
+        this.finder = finder;
+        this.callOptions = callOptions;
+    }
+
+    @Override
+    public Listener<ReqT> startCall(ServerCall<ReqT, RespT> serverCall, Metadata headers) {
+        Channel channel = finder.findChannel(serverCall, headers);
+        ClientCall<ReqT, RespT> clientCall = channel.newCall(
+            serverCall.getMethodDescriptor(), callOptions);
+        ProxyCall<ReqT, RespT> proxyCall = new ProxyCall<>(
+            serverCall,
+            clientCall);
+        clientCall.start(proxyCall.getClientCallListener(), headers);
+        serverCall.request(1);
+        clientCall.request(1);
+        return proxyCall.getServerCallListener();
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java
new file mode 100644
index 0000000..7e023f5
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Grpc Utils for proxying grpc requests.
+ */
+package org.apache.bookkeeper.common.grpc.proxy;
\ No newline at end of file
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java
new file mode 100644
index 0000000..730cc6f
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertSame;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.Test;
+
+/**
+ * Unit test {@link IdentityBinaryMarshaller}.
+ */
+public class IdentityBinaryMarshallerTest {
+
+    @Test
+    public void testParseAndToBytes() {
+        byte[] data = new byte[32];
+        ThreadLocalRandom.current().nextBytes(data);
+        byte[] dataCopy = new byte[data.length];
+        System.arraycopy(data, 0, dataCopy, 0, data.length);
+
+        byte[] serializedData = IdentityBinaryMarshaller.of().toBytes(data);
+        // identity binary marshaller should return same object
+        assertSame(data, serializedData);
+        // identity binary marshaller should return same content
+        assertArrayEquals(dataCopy, serializedData);
+
+        byte[] deserializedData = IdentityBinaryMarshaller.of().parseBytes(data);
+        // identity binary marshaller should return same object
+        assertSame(data, deserializedData);
+        // identity binary marshaller should return same content
+        assertArrayEquals(dataCopy, deserializedData);
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java
new file mode 100644
index 0000000..8cafc2f
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import java.io.InputStream;
+import org.junit.Test;
+
+/**
+ * Unit test {@link IdentityInputStreamMarshaller}.
+ */
+public class IdentityInputStreamMarshallerTest {
+
+    @Test
+    public void testStream() {
+        InputStream mockIs = mock(InputStream.class);
+        assertSame(mockIs, IdentityInputStreamMarshaller.of().stream(mockIs));
+    }
+
+    @Test
+    public void testParse() {
+        InputStream mockIs = mock(InputStream.class);
+        assertSame(mockIs, IdentityInputStreamMarshaller.of().parse(mockIs));
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java
new file mode 100644
index 0000000..e4dccb5
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.common.util.Bytes;
+import org.junit.Test;
+
+/**
+ * Unit test {@link LongBinaryMarshaller}.
+ */
+public class LongBinaryMarshallerTest {
+
+    @Test
+    public void testParseAndToBytes() {
+        long value = System.currentTimeMillis();
+        byte[] valueBytes = LongBinaryMarshaller.of().toBytes(value);
+        assertArrayEquals(Bytes.toBytes(value), valueBytes);
+        long parsedValue = LongBinaryMarshaller.of().parseBytes(valueBytes);
+        assertEquals(value, parsedValue);
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java
new file mode 100644
index 0000000..4611efe
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+/**
+ * Test PingPongService by directly accessing the grpc service.
+ *
+ * <p>This is to ensure the tests in {@link PingPongServiceTestBase} are correct to be used for testing
+ * reverse proxy in {@link ProxyPingPongServiceTest}.
+ */
+public class DirectPingPongServiceTest extends PingPongServiceTestBase {
+    public DirectPingPongServiceTest() {
+        super(false);
+    }
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java
new file mode 100644
index 0000000..ce3dcfd
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import lombok.Data;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.ExceptionalFunction;
+import org.apache.bookkeeper.tests.rpc.PingPongService;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceBlockingStub;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceStub;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test grpc reverse proxy using {@link org.apache.bookkeeper.tests.rpc.PingPongService}.
+ */
+public abstract class PingPongServiceTestBase {
+
+    private static final int NUM_PONGS_PER_PING = 10;
+    private static final String SERVICE_NAME = "pingpong";
+
+    private final boolean useReverseProxy;
+
+    protected Server realServer;
+    protected Server proxyServer;
+    protected PingPongService service;
+    protected ManagedChannel proxyChannel;
+    protected ManagedChannel clientChannel;
+    protected PingPongServiceStub client;
+
+    PingPongServiceTestBase(boolean useReverseProxy) {
+        this.useReverseProxy = useReverseProxy;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        service = new PingPongService(NUM_PONGS_PER_PING);
+        ServerServiceDefinition pingPongServiceDef = service.bindService();
+
+        String serverName;
+        if (useReverseProxy) {
+            serverName = "proxy-" + SERVICE_NAME;
+        } else {
+            serverName = SERVICE_NAME;
+        }
+        // build a real server
+        MutableHandlerRegistry realRegistry = new MutableHandlerRegistry();
+        realServer = InProcessServerBuilder
+            .forName(serverName)
+            .fallbackHandlerRegistry(realRegistry)
+            .directExecutor()
+            .build()
+            .start();
+        realRegistry.addService(pingPongServiceDef);
+
+        if (useReverseProxy) {
+            proxyChannel = InProcessChannelBuilder.forName(serverName)
+                .usePlaintext(false)
+                .build();
+
+            ProxyHandlerRegistry registry = ProxyHandlerRegistry.newBuilder()
+                .addService(pingPongServiceDef)
+                .setChannelFinder((serverCall, header) -> proxyChannel)
+                .build();
+            proxyServer = InProcessServerBuilder
+                .forName(SERVICE_NAME)
+                .fallbackHandlerRegistry(registry)
+                .directExecutor()
+                .build()
+                .start();
+        } else {
+            proxyServer = realServer;
+        }
+
+        clientChannel = InProcessChannelBuilder.forName(SERVICE_NAME)
+            .usePlaintext(false)
+            .build();
+
+        client = PingPongServiceGrpc.newStub(clientChannel);
+
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != clientChannel) {
+            clientChannel.shutdown();
+        }
+
+        if (null != proxyServer) {
+            proxyServer.shutdown();
+        }
+
+        if (null != proxyChannel) {
+            proxyChannel.shutdown();
+        }
+
+        if (null != realServer && proxyServer != realServer) {
+            realServer.shutdown();
+        }
+    }
+
+
+    @Test
+    public void testUnary() {
+        PingPongServiceBlockingStub clientBlocking = PingPongServiceGrpc.newBlockingStub(clientChannel);
+
+        long sequence = ThreadLocalRandom.current().nextLong();
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        PongResponse response = clientBlocking.pingPong(request);
+        assertEquals(sequence, response.getLastSequence());
+        assertEquals(1, response.getNumPingReceived());
+        assertEquals(0, response.getSlotId());
+    }
+
+    @Test
+    public void testServerStreaming() {
+        PingPongServiceBlockingStub clientBlocking = PingPongServiceGrpc.newBlockingStub(clientChannel);
+
+        long sequence = ThreadLocalRandom.current().nextLong(100000);
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        Iterator<PongResponse> respIter = clientBlocking.lotsOfPongs(request);
+        int count = 0;
+        while (respIter.hasNext()) {
+            PongResponse resp = respIter.next();
+            assertEquals(sequence, resp.getLastSequence());
+            assertEquals(1, resp.getNumPingReceived());
+            assertEquals(count, resp.getSlotId());
+            ++count;
+        }
+    }
+
+    @Test
+    public void testClientStreaming() throws Exception {
+        final int numPings = 100;
+        final long sequence = ThreadLocalRandom.current().nextLong(100000);
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = client.lotsOfPings(new StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        for (int i = 0; i < numPings; i++) {
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence + i)
+                .build();
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received.
+        result(respFuture);
+
+        assertEquals(1, respQueue.size());
+
+        PongResponse resp = respQueue.take();
+        assertEquals(sequence + numPings - 1, resp.getLastSequence());
+        assertEquals(numPings, resp.getNumPingReceived());
+        assertEquals(0, resp.getSlotId());
+    }
+
+    @Test
+    public void testBidiStreaming() throws Exception {
+        final int numPings = 100;
+
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = client.bidiPingPong(new StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        final LinkedBlockingQueue<PingRequest> reqQueue = new LinkedBlockingQueue<>();
+        for (int i = 0; i < numPings; i++) {
+            final long sequence = ThreadLocalRandom.current().nextLong(100000);
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence)
+                .build();
+            reqQueue.put(request);
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received
+        result(respFuture);
+
+        assertEquals(numPings, respQueue.size());
+
+        int count = 0;
+        for (PingRequest request : reqQueue) {
+            PongResponse response = respQueue.take();
+
+            assertEquals(request.getSequence(), response.getLastSequence());
+            assertEquals(++count, response.getNumPingReceived());
+            assertEquals(0, response.getSlotId());
+        }
+        assertNull(respQueue.poll());
+        assertEquals(numPings, count);
+    }
+
+    @Data
+    static class Runner implements Runnable {
+
+        private final CountDownLatch startLatch;
+        private final CountDownLatch doneLatch;
+        private final AtomicReference<Exception> exceptionHolder;
+        private final ExceptionalFunction<Void, Void> func;
+
+        @Override
+        public void run() {
+            try {
+                startLatch.await();
+            } catch (InterruptedException e) {
+            }
+            int numIters = ThreadLocalRandom.current().nextInt(10, 100);
+            IntStream.of(numIters).forEach(idx -> {
+                if (null != exceptionHolder.get()) {
+                    // break if exception occurs
+                    return;
+                }
+                try {
+                    func.apply(null);
+                } catch (Exception e) {
+                    exceptionHolder.set(e);
+                    doneLatch.countDown();
+                }
+            });
+            if (null == exceptionHolder.get()) {
+                doneLatch.countDown();
+            }
+        }
+    }
+
+    @Test
+    public void testMixed() throws Exception {
+        int numTypes = 4;
+
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numTypes);
+        final AtomicReference<Exception> exception = new AtomicReference<>();
+
+        ExecutorService executor = Executors.newFixedThreadPool(numTypes);
+        // start unmary test
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testUnary();
+            return null;
+        }));
+
+        // start client streaming tests
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testClientStreaming();
+            return null;
+        }));
+
+        // start server streaming tests
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testServerStreaming();
+            return null;
+        }));
+
+        // start bidi streaming tests
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testBidiStreaming();
+            return null;
+        }));
+
+        // start the tests
+        startLatch.countDown();
+
+        // wait for tests to complete
+        doneLatch.await();
+
+        // make sure all succeed
+        assertNull("Exception found : " + exception.get(), exception.get());
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java
new file mode 100644
index 0000000..a099717
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+/**
+ * Test reverse grpc proxy using ping pong service.
+ */
+public class ProxyPingPongServiceTest extends PingPongServiceTestBase {
+
+    public ProxyPingPongServiceTest() {
+        super(true);
+    }
+}
diff --git a/stream/pom.xml b/stream/pom.xml
index f2580ec..340c6f4 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -30,6 +30,7 @@
 
   <modules>
     <module>common</module>
+    <module>tests-common</module>
     <module>statelib</module>
     <module>api</module>
     <module>proto</module>
diff --git a/stream/proto/pom.xml b/stream/proto/pom.xml
index c943c77..644c312 100644
--- a/stream/proto/pom.xml
+++ b/stream/proto/pom.xml
@@ -41,6 +41,12 @@
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.tests</groupId>
+      <artifactId>stream-storage-tests-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
index cf9b917..3eb440a 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/stream/proto/pom.xml b/stream/tests-common/pom.xml
similarity index 83%
copy from stream/proto/pom.xml
copy to stream/tests-common/pom.xml
index c943c77..0f6cc10 100644
--- a/stream/proto/pom.xml
+++ b/stream/tests-common/pom.xml
@@ -15,27 +15,23 @@
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bookkeeper</groupId>
     <artifactId>stream-storage-parent</artifactId>
-    <version>4.7.1-SNAPSHOT</version>
+    <version>4.8.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
-  <groupId>org.apache.bookkeeper</groupId>
-  <artifactId>stream-storage-proto</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Proto</name>
+  <groupId>org.apache.bookkeeper.tests</groupId>
+  <artifactId>stream-storage-tests-common</artifactId>
+  <name>Apache BookKeeper :: Stream Storage :: Common Classes for Tests</name>
 
   <dependencies>
     <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-common</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
     </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>
diff --git a/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java
new file mode 100644
index 0000000..4a94776
--- /dev/null
+++ b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.rpc;
+
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceImplBase;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+
+/**
+ * An implementation of the ping pong service used for testing.
+ */
+@Slf4j
+public class PingPongService extends PingPongServiceImplBase {
+
+    private final int streamPongSize;
+
+    public PingPongService(int streamPongSize) {
+        this.streamPongSize = streamPongSize;
+    }
+
+    @Override
+    public void pingPong(PingRequest request, StreamObserver<PongResponse> responseObserver) {
+        responseObserver.onNext(PongResponse.newBuilder()
+            .setLastSequence(request.getSequence())
+            .setNumPingReceived(1)
+            .setSlotId(0)
+            .build());
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public StreamObserver<PingRequest> lotsOfPings(StreamObserver<PongResponse> responseObserver) {
+        return new StreamObserver<PingRequest>() {
+
+            int pingCount = 0;
+            long lastSequence = -1L;
+
+            @Override
+            public void onNext(PingRequest value) {
+                pingCount++;
+                lastSequence = value.getSequence();
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                log.error("Failed on receiving stream of pings", t);
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onNext(PongResponse.newBuilder()
+                    .setNumPingReceived(pingCount)
+                    .setLastSequence(lastSequence)
+                    .setSlotId(0)
+                    .build());
+                responseObserver.onCompleted();
+            }
+        };
+    }
+
+    @Override
+    public void lotsOfPongs(PingRequest request, StreamObserver<PongResponse> responseObserver) {
+        long sequence = request.getSequence();
+        for (int i = 0; i < streamPongSize; i++) {
+            responseObserver.onNext(PongResponse.newBuilder()
+                .setLastSequence(sequence)
+                .setNumPingReceived(1)
+                .setSlotId(i)
+                .build());
+        }
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public StreamObserver<PingRequest> bidiPingPong(StreamObserver<PongResponse> responseObserver) {
+        return new StreamObserver<PingRequest>() {
+
+            int pingCount = 0;
+
+            @Override
+            public void onNext(PingRequest ping) {
+                pingCount++;
+                responseObserver.onNext(PongResponse.newBuilder()
+                    .setLastSequence(ping.getSequence())
+                    .setNumPingReceived(pingCount)
+                    .setSlotId(0)
+                    .build());
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                responseObserver.onError(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onCompleted();
+            }
+        };
+    }
+}
diff --git a/stream/proto/src/main/proto/proto2_coder_test_messages.proto b/stream/tests-common/src/main/proto/proto2_coder_test_messages.proto
similarity index 100%
rename from stream/proto/src/main/proto/proto2_coder_test_messages.proto
rename to stream/tests-common/src/main/proto/proto2_coder_test_messages.proto
diff --git a/stream/tests-common/src/main/proto/rpc.proto b/stream/tests-common/src/main/proto/rpc.proto
new file mode 100644
index 0000000..8bda369
--- /dev/null
+++ b/stream/tests-common/src/main/proto/rpc.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package bookkeeper.tests.proto.rpc;
+
+option java_multiple_files = true;
+option java_package = "org.bookkeeper.tests.proto.rpc";
+
+message PingRequest {
+    int64 sequence = 1;
+}
+
+message PongResponse {
+    int64 last_sequence = 1;
+    int32 num_ping_received = 2;
+    // the slot id in this stream of pong responses.
+    int32 slot_id = 3;
+}
+
+service PingPongService {
+
+    rpc PingPong(PingRequest) returns (PongResponse) {}
+
+    rpc LotsOfPings(stream PingRequest) returns (PongResponse) {}
+
+    rpc LotsOfPongs(PingRequest) returns (stream PongResponse) {}
+
+    rpc BidiPingPong(stream PingRequest) returns (stream PongResponse) {}
+
+}
+
+

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.