You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 21:01:30 UTC
[bookkeeper] 09/10: [TABLE SERVICE] client interceptor and storage
container grpc proxy
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit c9b32d7d3cc352dd6fdd998e91ba15f206b7a568
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri May 25 01:02:06 2018 -0700
[TABLE SERVICE] client interceptor and storage container grpc proxy
Descriptions of the changes in this PR:
This is a subsequent change after apache/bookkeeper#1428.
*Motivation*
Current almost every grpc requests are wrapped into `StorageContainerRequest` and their responses
are wrapped into `StorageContainerResponse`. It makes things a bit complicated on adding new grpc
services.
*Changes*
To simplify things, this PR introduces two functionalities for simplifying dispatching container requests/responses.
1) *StorageContainerClientInterceptor*: A grpc `ClientInterceptor` that stamps container information (currently is `scId`) into the requests' metadata before sending the requests to the wire.
2) A simple grpc reverse proxy to dispatch grpc requests to the channels provided by a `ChannelFinder`.
*Tests*
1. Existing unit tests covered client interceptor changes.
2. Introduced a `stream-storage-tests-common` module to include common classes that would be used for testing.
3. Introduced a `PingPongService` for testing reverse proxy : unary/client-streaming/server-streaming/bidi-streaming.
Master Issue: #1205
Author: Sijie Guo <si...@apache.org>
Reviewers: Jia Zhai <None>
This closes #1430 from sijie/interceptor_container_requests
---
.../clients/impl/channel/StorageServerChannel.java | 30 +-
.../impl/container/StorageContainerChannel.java | 6 +-
.../StorageContainerClientInterceptor.java | 60 ++++
stream/common/pom.xml | 6 +
.../grpc/netty/IdentityBinaryMarshaller.java | 45 +++
.../grpc/netty/IdentityInputStreamMarshaller.java | 46 +++
.../common/grpc/netty/LongBinaryMarshaller.java | 46 +++
.../bookkeeper/common/grpc/netty/package-info.java | 22 ++
.../bookkeeper/common/grpc/package-info.java | 22 ++
.../common/grpc/proxy/ChannelFinder.java | 40 +++
.../bookkeeper/common/grpc/proxy/ProxyCall.java | 138 +++++++++
.../common/grpc/proxy/ProxyHandlerRegistry.java | 135 ++++++++
.../common/grpc/proxy/ProxyServerCallHandler.java | 56 ++++
.../bookkeeper/common/grpc/proxy/package-info.java | 22 ++
.../grpc/netty/IdentityBinaryMarshallerTest.java | 52 ++++
.../netty/IdentityInputStreamMarshallerTest.java | 44 +++
.../grpc/netty/LongBinaryMarshallerTest.java | 41 +++
.../grpc/proxy/DirectPingPongServiceTest.java | 31 ++
.../common/grpc/proxy/PingPongServiceTestBase.java | 345 +++++++++++++++++++++
.../grpc/proxy/ProxyPingPongServiceTest.java | 29 ++
stream/pom.xml | 1 +
stream/proto/pom.xml | 6 +
.../TestStorageContainerResponseHandler.java | 2 +-
stream/{proto => tests-common}/pom.xml | 20 +-
.../bookkeeper/tests/rpc/PingPongService.java | 119 +++++++
.../main/proto/proto2_coder_test_messages.proto | 0
stream/tests-common/src/main/proto/rpc.proto | 48 +++
27 files changed, 1396 insertions(+), 16 deletions(-)
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 34dc957..7e1e022 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -19,12 +19,15 @@
package org.apache.bookkeeper.clients.impl.channel;
import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
import org.apache.bookkeeper.clients.resolver.EndpointResolver;
import org.apache.bookkeeper.clients.utils.GrpcUtils;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
@@ -53,7 +56,7 @@ public class StorageServerChannel implements AutoCloseable {
}
private final Optional<String> token;
- private final ManagedChannel channel;
+ private final Channel channel;
@GuardedBy("this")
private RootRangeServiceFutureStub rootRangeService;
@@ -87,6 +90,11 @@ public class StorageServerChannel implements AutoCloseable {
@VisibleForTesting
public StorageServerChannel(ManagedChannel channel,
Optional<String> token) {
+ this((Channel) channel, token);
+ }
+
+ protected StorageServerChannel(Channel channel,
+ Optional<String> token) {
this.token = token;
this.channel = channel;
}
@@ -127,8 +135,26 @@ public class StorageServerChannel implements AutoCloseable {
return kvService;
}
+ /**
+ * Create an intercepted server channel that add additional storage container metadata.
+ *
+ * @param scId storage container id
+ * @return an intercepted server channel.
+ */
+ public StorageServerChannel intercept(long scId) {
+ Channel interceptedChannel = ClientInterceptors.intercept(
+ this.channel,
+ new StorageContainerClientInterceptor(scId));
+
+ return new StorageServerChannel(
+ interceptedChannel,
+ this.token);
+ }
+
@Override
public void close() {
- channel.shutdown();
+ if (channel instanceof ManagedChannel) {
+ ((ManagedChannel) channel).shutdown();
+ }
}
}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
index 8635e0f..a20d7c9 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
@@ -190,9 +190,13 @@ public class StorageContainerChannel {
}
return;
}
+
+ // intercept the storage server channel with additional sc metadata
+ StorageServerChannel interceptedChannel = serverChannel.intercept(scId);
+
// update the future
synchronized (this) {
- rsChannelFuture.complete(serverChannel);
+ rsChannelFuture.complete(interceptedChannel);
}
}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
new file mode 100644
index 0000000..284431e
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.container;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
+
+/**
+ * A client interceptor that intercepting outgoing calls to storage containers.
+ */
+public class StorageContainerClientInterceptor implements ClientInterceptor {
+
+ private static final String SC_ID_KEY = "SC_ID";
+
+ private final long scId;
+ private final Metadata.Key<Long> scIdKey;
+
+ public StorageContainerClientInterceptor(long scId) {
+ this.scId = scId;
+ this.scIdKey = Metadata.Key.of(
+ SC_ID_KEY,
+ LongBinaryMarshaller.of());
+ }
+
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+ CallOptions callOptions,
+ Channel next) {
+ return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+ @Override
+ protected void checkedStart(Listener<RespT> responseListener,
+ Metadata headers) throws Exception {
+ headers.put(scIdKey, scId);
+ delegate().start(responseListener, headers);
+ }
+ };
+ }
+}
diff --git a/stream/common/pom.xml b/stream/common/pom.xml
index 43fdc36..50e4edd 100644
--- a/stream/common/pom.xml
+++ b/stream/common/pom.xml
@@ -40,5 +40,11 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper.tests</groupId>
+ <artifactId>stream-storage-tests-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java
new file mode 100644
index 0000000..8b1c939
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.Metadata.BinaryMarshaller;
+
+/**
+ * Marshaller for byte array.
+ */
+public class IdentityBinaryMarshaller implements BinaryMarshaller<byte[]> {
+
+ public static IdentityBinaryMarshaller of() {
+ return INSTANCE;
+ }
+
+ private static final IdentityBinaryMarshaller INSTANCE = new IdentityBinaryMarshaller();
+
+ private IdentityBinaryMarshaller() {}
+
+ @Override
+ public byte[] toBytes(byte[] value) {
+ return value;
+ }
+
+ @Override
+ public byte[] parseBytes(byte[] serialized) {
+ return serialized;
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java
new file mode 100644
index 0000000..fd54b5e
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.MethodDescriptor;
+import java.io.InputStream;
+
+/**
+ * An identity marshaller.
+ */
+public class IdentityInputStreamMarshaller implements MethodDescriptor.Marshaller<InputStream> {
+
+ public static IdentityInputStreamMarshaller of() {
+ return INSTANCE;
+ }
+
+ private static final IdentityInputStreamMarshaller INSTANCE = new IdentityInputStreamMarshaller();
+
+ private IdentityInputStreamMarshaller() {}
+
+ @Override
+ public InputStream stream(InputStream value) {
+ return value;
+ }
+
+ @Override
+ public InputStream parse(InputStream stream) {
+ return stream;
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java
new file mode 100644
index 0000000..92194f1
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.Metadata.BinaryMarshaller;
+import org.apache.bookkeeper.common.util.Bytes;
+
+/**
+ * Marshaller for long numbers.
+ */
+public class LongBinaryMarshaller implements BinaryMarshaller<Long> {
+
+ public static LongBinaryMarshaller of() {
+ return INSTANCE;
+ }
+
+ private static final LongBinaryMarshaller INSTANCE = new LongBinaryMarshaller();
+
+ private LongBinaryMarshaller() {}
+
+ @Override
+ public byte[] toBytes(Long value) {
+ return Bytes.toBytes(value);
+ }
+
+ @Override
+ public Long parseBytes(byte[] serialized) {
+ return Bytes.toLong(serialized, 0);
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java
new file mode 100644
index 0000000..3e69f68
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common Grpc Netty Utils.
+ */
+package org.apache.bookkeeper.common.grpc.netty;
\ No newline at end of file
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java
new file mode 100644
index 0000000..d943bdf
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common Grpc Utils.
+ */
+package org.apache.bookkeeper.common.grpc;
\ No newline at end of file
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java
new file mode 100644
index 0000000..698c380
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.Channel;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+
+/**
+ * Find a grpc {@link io.grpc.Channel} to route the requests.
+ */
+public interface ChannelFinder {
+
+ /**
+ * Find a channel to route the server call.
+ *
+ * @param serverCall server call
+ * @param headers request metadata
+ * @return channel to route the server call.
+ */
+ Channel findChannel(ServerCall<?, ?> serverCall,
+ Metadata headers);
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java
new file mode 100644
index 0000000..35f6d2d
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import lombok.Getter;
+
+/**
+ * Proxy grpc calls.
+ */
+@Getter
+class ProxyCall<ReqT, RespT> {
+
+ private final RequestProxy serverCallListener;
+ private final ResponseProxy clientCallListener;
+
+ ProxyCall(ServerCall<ReqT, RespT> serverCall,
+ ClientCall<ReqT, RespT> clientCall) {
+ this.serverCallListener = new RequestProxy(clientCall);
+ this.clientCallListener = new ResponseProxy(serverCall);
+ }
+
+ /**
+ * Request proxy to delegate client call.
+ */
+ private class RequestProxy extends ServerCall.Listener<ReqT> {
+
+ private final ClientCall<ReqT, ?> clientCall;
+ private boolean needToRequest;
+
+ public RequestProxy(ClientCall<ReqT, ?> clientCall) {
+ this.clientCall = clientCall;
+ }
+
+ @Override
+ public void onMessage(ReqT message) {
+ clientCall.sendMessage(message);
+ synchronized (this) {
+ if (clientCall.isReady()) {
+ clientCallListener.serverCall.request(1);
+ } else {
+ needToRequest = true;
+ }
+ }
+ }
+
+ @Override
+ public void onHalfClose() {
+ clientCall.halfClose();
+ }
+
+ @Override
+ public void onCancel() {
+ clientCall.cancel("Server cancelled", null);
+ }
+
+ @Override
+ public void onReady() {
+ clientCallListener.onServerReady();
+ }
+
+ synchronized void onClientReady() {
+ if (needToRequest) {
+ clientCallListener.serverCall.request(1);
+ needToRequest = false;
+ }
+ }
+
+ }
+
+ /**
+ * Response proxy to delegate server call.
+ */
+ private class ResponseProxy extends ClientCall.Listener<RespT> {
+
+ private final ServerCall<?, RespT> serverCall;
+ private boolean needToRequest;
+
+ public ResponseProxy(ServerCall<?, RespT> serverCall) {
+ this.serverCall = serverCall;
+ }
+
+ @Override
+ public void onHeaders(Metadata headers) {
+ serverCall.sendHeaders(headers);
+ }
+
+ @Override
+ public void onMessage(RespT message) {
+ serverCall.sendMessage(message);
+ synchronized (this) {
+ if (serverCall.isReady()) {
+ serverCallListener.clientCall.request(1);
+ } else {
+ needToRequest = true;
+ }
+ }
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ serverCall.close(status, trailers);
+ }
+
+ @Override
+ public void onReady() {
+ serverCallListener.onClientReady();
+ }
+
+ synchronized void onServerReady() {
+ if (needToRequest) {
+ serverCallListener.clientCall.request(1);
+ needToRequest = false;
+ }
+ }
+
+ }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java
new file mode 100644
index 0000000..94cebb1
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.grpc.CallOptions;
+import io.grpc.HandlerRegistry;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerMethodDefinition;
+import io.grpc.ServerServiceDefinition;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.common.grpc.netty.IdentityInputStreamMarshaller;
+
+/**
+ * Registry for proxying grpc services.
+ */
+public class ProxyHandlerRegistry extends HandlerRegistry {
+
+ private final Map<String, ServerMethodDefinition<?, ?>> methods;
+
+ private ProxyHandlerRegistry(Map<String, ServerMethodDefinition<?, ?>> methods) {
+ this.methods = methods;
+ }
+
+ @Nullable
+ @Override
+ public ServerMethodDefinition<?, ?> lookupMethod(String methodName,
+ @Nullable String authority) {
+ return methods.get(methodName);
+ }
+
+ private static ServerMethodDefinition<?, ?> createProxyServerMethodDefinition(
+ MethodDescriptor<?, ?> methodDesc,
+ ServerCallHandler<InputStream, InputStream> handler) {
+ MethodDescriptor<InputStream, InputStream> methodDescriptor = MethodDescriptor.newBuilder(
+ IdentityInputStreamMarshaller.of(), IdentityInputStreamMarshaller.of())
+ .setFullMethodName(methodDesc.getFullMethodName())
+ .setType(methodDesc.getType())
+ .setIdempotent(methodDesc.isIdempotent())
+ .setSafe(methodDesc.isSafe())
+ .build();
+ return ServerMethodDefinition.create(methodDescriptor, handler);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder to build the handler registry.
+ */
+ public static class Builder {
+
+ // store per-service first, to make sure services are added/replaced atomically.
+ private final HashMap<String, ServerServiceDefinition> services =
+ new LinkedHashMap<>();
+ private ChannelFinder finder;
+
+ /**
+ * Add the service to this grpc handler registry.
+ *
+ * @param service grpc service definition
+ * @return registry builder
+ */
+ public Builder addService(ServerServiceDefinition service) {
+ services.put(
+ service.getServiceDescriptor().getName(),
+ service);
+ return this;
+ }
+
+ /**
+ * Registered a channel finder for proxying server calls.
+ *
+ * @param finder channel finder
+ * @return registry builder
+ */
+ public Builder setChannelFinder(ChannelFinder finder) {
+ this.finder = finder;
+ return this;
+ }
+
+ /**
+ * Build the proxy handler registry.
+ *
+ * @return registry builder
+ */
+ public ProxyHandlerRegistry build() {
+ checkNotNull(finder, "No channel finder defined");
+
+ ProxyServerCallHandler<InputStream, InputStream> proxyHandler =
+ new ProxyServerCallHandler<>(finder, CallOptions.DEFAULT);
+
+ Map<String, ServerMethodDefinition<?, ?>> methods = new HashMap<>();
+ for (ServerServiceDefinition service : services.values()) {
+ for (ServerMethodDefinition<?, ?> method : service.getMethods()) {
+ String methodName = method.getMethodDescriptor().getFullMethodName();
+ methods.put(
+ methodName,
+ createProxyServerMethodDefinition(
+ method.getMethodDescriptor(),
+ proxyHandler)
+ );
+ }
+ }
+ return new ProxyHandlerRegistry(
+ Collections.unmodifiableMap(methods));
+ }
+
+ }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
new file mode 100644
index 0000000..a691497
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+
+/**
+ * Abstract server call handler
+ */
+class ProxyServerCallHandler<ReqT, RespT> implements ServerCallHandler<ReqT, RespT> {
+
+ private final ChannelFinder finder;
+ private final CallOptions callOptions;
+
+ ProxyServerCallHandler(ChannelFinder finder,
+ CallOptions callOptions) {
+ this.finder = finder;
+ this.callOptions = callOptions;
+ }
+
+ @Override
+ public Listener<ReqT> startCall(ServerCall<ReqT, RespT> serverCall, Metadata headers) {
+ Channel channel = finder.findChannel(serverCall, headers);
+ ClientCall<ReqT, RespT> clientCall = channel.newCall(
+ serverCall.getMethodDescriptor(), callOptions);
+ ProxyCall<ReqT, RespT> proxyCall = new ProxyCall<>(
+ serverCall,
+ clientCall);
+ clientCall.start(proxyCall.getClientCallListener(), headers);
+ serverCall.request(1);
+ clientCall.request(1);
+ return proxyCall.getServerCallListener();
+ }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java
new file mode 100644
index 0000000..7e023f5
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Grpc Utils for proxying grpc requests.
+ */
+package org.apache.bookkeeper.common.grpc.proxy;
\ No newline at end of file
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java
new file mode 100644
index 0000000..730cc6f
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertSame;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.Test;
+
+/**
+ * Unit test {@link IdentityBinaryMarshaller}.
+ */
+public class IdentityBinaryMarshallerTest {
+
+ @Test
+ public void testParseAndToBytes() {
+ byte[] data = new byte[32];
+ ThreadLocalRandom.current().nextBytes(data);
+ byte[] dataCopy = new byte[data.length];
+ System.arraycopy(data, 0, dataCopy, 0, data.length);
+
+ byte[] serializedData = IdentityBinaryMarshaller.of().toBytes(data);
+ // identity binary marshaller should return same object
+ assertSame(data, serializedData);
+ // identity binary marshaller should return same content
+ assertArrayEquals(dataCopy, serializedData);
+
+ byte[] deserializedData = IdentityBinaryMarshaller.of().parseBytes(data);
+ // identity binary marshaller should return same object
+ assertSame(data, deserializedData);
+ // identity binary marshaller should return same content
+ assertArrayEquals(dataCopy, deserializedData);
+ }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java
new file mode 100644
index 0000000..8cafc2f
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import java.io.InputStream;
+import org.junit.Test;
+
+/**
+ * Unit test {@link IdentityInputStreamMarshaller}.
+ */
+public class IdentityInputStreamMarshallerTest {
+
+ @Test
+ public void testStream() {
+ InputStream mockIs = mock(InputStream.class);
+ assertSame(mockIs, IdentityInputStreamMarshaller.of().stream(mockIs));
+ }
+
+ @Test
+ public void testParse() {
+ InputStream mockIs = mock(InputStream.class);
+ assertSame(mockIs, IdentityInputStreamMarshaller.of().parse(mockIs));
+ }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java
new file mode 100644
index 0000000..e4dccb5
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.common.util.Bytes;
+import org.junit.Test;
+
+/**
+ * Unit test {@link LongBinaryMarshaller}.
+ */
+public class LongBinaryMarshallerTest {
+
+ @Test
+ public void testParseAndToBytes() {
+ long value = System.currentTimeMillis();
+ byte[] valueBytes = LongBinaryMarshaller.of().toBytes(value);
+ assertArrayEquals(Bytes.toBytes(value), valueBytes);
+ long parsedValue = LongBinaryMarshaller.of().parseBytes(valueBytes);
+ assertEquals(value, parsedValue);
+ }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java
new file mode 100644
index 0000000..4611efe
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+/**
+ * Test PingPongService by directly accessing the grpc service.
+ *
+ * <p>This is to ensure the tests in {@link PingPongServiceTestBase} are correct to be used for testing
+ * reverse proxy in {@link ProxyPingPongServiceTest}.
+ */
+public class DirectPingPongServiceTest extends PingPongServiceTestBase {
+ public DirectPingPongServiceTest() {
+ super(false);
+ }
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java
new file mode 100644
index 0000000..ce3dcfd
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import lombok.Data;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.ExceptionalFunction;
+import org.apache.bookkeeper.tests.rpc.PingPongService;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceBlockingStub;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceStub;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test grpc reverse proxy using {@link org.apache.bookkeeper.tests.rpc.PingPongService}.
+ */
+public abstract class PingPongServiceTestBase {
+
+ private static final int NUM_PONGS_PER_PING = 10;
+ private static final String SERVICE_NAME = "pingpong";
+
+ private final boolean useReverseProxy;
+
+ protected Server realServer;
+ protected Server proxyServer;
+ protected PingPongService service;
+ protected ManagedChannel proxyChannel;
+ protected ManagedChannel clientChannel;
+ protected PingPongServiceStub client;
+
+ PingPongServiceTestBase(boolean useReverseProxy) {
+ this.useReverseProxy = useReverseProxy;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ service = new PingPongService(NUM_PONGS_PER_PING);
+ ServerServiceDefinition pingPongServiceDef = service.bindService();
+
+ String serverName;
+ if (useReverseProxy) {
+ serverName = "proxy-" + SERVICE_NAME;
+ } else {
+ serverName = SERVICE_NAME;
+ }
+ // build a real server
+ MutableHandlerRegistry realRegistry = new MutableHandlerRegistry();
+ realServer = InProcessServerBuilder
+ .forName(serverName)
+ .fallbackHandlerRegistry(realRegistry)
+ .directExecutor()
+ .build()
+ .start();
+ realRegistry.addService(pingPongServiceDef);
+
+ if (useReverseProxy) {
+ proxyChannel = InProcessChannelBuilder.forName(serverName)
+ .usePlaintext(false)
+ .build();
+
+ ProxyHandlerRegistry registry = ProxyHandlerRegistry.newBuilder()
+ .addService(pingPongServiceDef)
+ .setChannelFinder((serverCall, header) -> proxyChannel)
+ .build();
+ proxyServer = InProcessServerBuilder
+ .forName(SERVICE_NAME)
+ .fallbackHandlerRegistry(registry)
+ .directExecutor()
+ .build()
+ .start();
+ } else {
+ proxyServer = realServer;
+ }
+
+ clientChannel = InProcessChannelBuilder.forName(SERVICE_NAME)
+ .usePlaintext(false)
+ .build();
+
+ client = PingPongServiceGrpc.newStub(clientChannel);
+
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (null != clientChannel) {
+ clientChannel.shutdown();
+ }
+
+ if (null != proxyServer) {
+ proxyServer.shutdown();
+ }
+
+ if (null != proxyChannel) {
+ proxyChannel.shutdown();
+ }
+
+ if (null != realServer && proxyServer != realServer) {
+ realServer.shutdown();
+ }
+ }
+
+
+ @Test
+ public void testUnary() {
+ PingPongServiceBlockingStub clientBlocking = PingPongServiceGrpc.newBlockingStub(clientChannel);
+
+ long sequence = ThreadLocalRandom.current().nextLong();
+ PingRequest request = PingRequest.newBuilder()
+ .setSequence(sequence)
+ .build();
+ PongResponse response = clientBlocking.pingPong(request);
+ assertEquals(sequence, response.getLastSequence());
+ assertEquals(1, response.getNumPingReceived());
+ assertEquals(0, response.getSlotId());
+ }
+
+ @Test
+ public void testServerStreaming() {
+ PingPongServiceBlockingStub clientBlocking = PingPongServiceGrpc.newBlockingStub(clientChannel);
+
+ long sequence = ThreadLocalRandom.current().nextLong(100000);
+ PingRequest request = PingRequest.newBuilder()
+ .setSequence(sequence)
+ .build();
+ Iterator<PongResponse> respIter = clientBlocking.lotsOfPongs(request);
+ int count = 0;
+ while (respIter.hasNext()) {
+ PongResponse resp = respIter.next();
+ assertEquals(sequence, resp.getLastSequence());
+ assertEquals(1, resp.getNumPingReceived());
+ assertEquals(count, resp.getSlotId());
+ ++count;
+ }
+ }
+
+ @Test
+ public void testClientStreaming() throws Exception {
+ final int numPings = 100;
+ final long sequence = ThreadLocalRandom.current().nextLong(100000);
+ final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+ final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+ StreamObserver<PingRequest> pinger = client.lotsOfPings(new StreamObserver<PongResponse>() {
+ @Override
+ public void onNext(PongResponse resp) {
+ respQueue.offer(resp);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ respFuture.completeExceptionally(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ FutureUtils.complete(respFuture, null);
+ }
+ });
+
+ for (int i = 0; i < numPings; i++) {
+ PingRequest request = PingRequest.newBuilder()
+ .setSequence(sequence + i)
+ .build();
+ pinger.onNext(request);
+ }
+ pinger.onCompleted();
+
+ // wait for response to be received.
+ result(respFuture);
+
+ assertEquals(1, respQueue.size());
+
+ PongResponse resp = respQueue.take();
+ assertEquals(sequence + numPings - 1, resp.getLastSequence());
+ assertEquals(numPings, resp.getNumPingReceived());
+ assertEquals(0, resp.getSlotId());
+ }
+
+ @Test
+ public void testBidiStreaming() throws Exception {
+ final int numPings = 100;
+
+ final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+ final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+ StreamObserver<PingRequest> pinger = client.bidiPingPong(new StreamObserver<PongResponse>() {
+ @Override
+ public void onNext(PongResponse resp) {
+ respQueue.offer(resp);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ respFuture.completeExceptionally(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ FutureUtils.complete(respFuture, null);
+ }
+ });
+
+ final LinkedBlockingQueue<PingRequest> reqQueue = new LinkedBlockingQueue<>();
+ for (int i = 0; i < numPings; i++) {
+ final long sequence = ThreadLocalRandom.current().nextLong(100000);
+ PingRequest request = PingRequest.newBuilder()
+ .setSequence(sequence)
+ .build();
+ reqQueue.put(request);
+ pinger.onNext(request);
+ }
+ pinger.onCompleted();
+
+ // wait for response to be received
+ result(respFuture);
+
+ assertEquals(numPings, respQueue.size());
+
+ int count = 0;
+ for (PingRequest request : reqQueue) {
+ PongResponse response = respQueue.take();
+
+ assertEquals(request.getSequence(), response.getLastSequence());
+ assertEquals(++count, response.getNumPingReceived());
+ assertEquals(0, response.getSlotId());
+ }
+ assertNull(respQueue.poll());
+ assertEquals(numPings, count);
+ }
+
+ @Data
+ static class Runner implements Runnable {
+
+ private final CountDownLatch startLatch;
+ private final CountDownLatch doneLatch;
+ private final AtomicReference<Exception> exceptionHolder;
+ private final ExceptionalFunction<Void, Void> func;
+
+ @Override
+ public void run() {
+ try {
+ startLatch.await();
+ } catch (InterruptedException e) {
+ }
+ int numIters = ThreadLocalRandom.current().nextInt(10, 100);
+ IntStream.of(numIters).forEach(idx -> {
+ if (null != exceptionHolder.get()) {
+ // break if exception occurs
+ return;
+ }
+ try {
+ func.apply(null);
+ } catch (Exception e) {
+ exceptionHolder.set(e);
+ doneLatch.countDown();
+ }
+ });
+ if (null == exceptionHolder.get()) {
+ doneLatch.countDown();
+ }
+ }
+ }
+
+ @Test
+ public void testMixed() throws Exception {
+ int numTypes = 4;
+
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch doneLatch = new CountDownLatch(numTypes);
+ final AtomicReference<Exception> exception = new AtomicReference<>();
+
+ ExecutorService executor = Executors.newFixedThreadPool(numTypes);
+ // start unmary test
+ executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+ testUnary();
+ return null;
+ }));
+
+ // start client streaming tests
+ executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+ testClientStreaming();
+ return null;
+ }));
+
+ // start server streaming tests
+ executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+ testServerStreaming();
+ return null;
+ }));
+
+ // start bidi streaming tests
+ executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+ testBidiStreaming();
+ return null;
+ }));
+
+ // start the tests
+ startLatch.countDown();
+
+ // wait for tests to complete
+ doneLatch.await();
+
+ // make sure all succeed
+ assertNull("Exception found : " + exception.get(), exception.get());
+ }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java
new file mode 100644
index 0000000..a099717
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+/**
+ * Test reverse grpc proxy using ping pong service.
+ */
+public class ProxyPingPongServiceTest extends PingPongServiceTestBase {
+
+ public ProxyPingPongServiceTest() {
+ super(true);
+ }
+}
diff --git a/stream/pom.xml b/stream/pom.xml
index f2580ec..340c6f4 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -30,6 +30,7 @@
<modules>
<module>common</module>
+ <module>tests-common</module>
<module>statelib</module>
<module>api</module>
<module>proto</module>
diff --git a/stream/proto/pom.xml b/stream/proto/pom.xml
index c943c77..644c312 100644
--- a/stream/proto/pom.xml
+++ b/stream/proto/pom.xml
@@ -41,6 +41,12 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper.tests</groupId>
+ <artifactId>stream-storage-tests-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
index cf9b917..3eb440a 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/stream/proto/pom.xml b/stream/tests-common/pom.xml
similarity index 83%
copy from stream/proto/pom.xml
copy to stream/tests-common/pom.xml
index c943c77..0f6cc10 100644
--- a/stream/proto/pom.xml
+++ b/stream/tests-common/pom.xml
@@ -15,27 +15,23 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-parent</artifactId>
- <version>4.7.1-SNAPSHOT</version>
+ <version>4.8.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
- <groupId>org.apache.bookkeeper</groupId>
- <artifactId>stream-storage-proto</artifactId>
- <name>Apache BookKeeper :: Stream Storage :: Proto</name>
+ <groupId>org.apache.bookkeeper.tests</groupId>
+ <artifactId>stream-storage-tests-common</artifactId>
+ <name>Apache BookKeeper :: Stream Storage :: Common Classes for Tests</name>
<dependencies>
<dependency>
- <groupId>org.apache.bookkeeper</groupId>
- <artifactId>stream-storage-common</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
diff --git a/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java
new file mode 100644
index 0000000..4a94776
--- /dev/null
+++ b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.rpc;
+
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceImplBase;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+
+/**
+ * An implementation of the ping pong service used for testing.
+ */
+@Slf4j
+public class PingPongService extends PingPongServiceImplBase {
+
+ private final int streamPongSize;
+
+ public PingPongService(int streamPongSize) {
+ this.streamPongSize = streamPongSize;
+ }
+
+ @Override
+ public void pingPong(PingRequest request, StreamObserver<PongResponse> responseObserver) {
+ responseObserver.onNext(PongResponse.newBuilder()
+ .setLastSequence(request.getSequence())
+ .setNumPingReceived(1)
+ .setSlotId(0)
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public StreamObserver<PingRequest> lotsOfPings(StreamObserver<PongResponse> responseObserver) {
+ return new StreamObserver<PingRequest>() {
+
+ int pingCount = 0;
+ long lastSequence = -1L;
+
+ @Override
+ public void onNext(PingRequest value) {
+ pingCount++;
+ lastSequence = value.getSequence();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error("Failed on receiving stream of pings", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onNext(PongResponse.newBuilder()
+ .setNumPingReceived(pingCount)
+ .setLastSequence(lastSequence)
+ .setSlotId(0)
+ .build());
+ responseObserver.onCompleted();
+ }
+ };
+ }
+
+ @Override
+ public void lotsOfPongs(PingRequest request, StreamObserver<PongResponse> responseObserver) {
+ long sequence = request.getSequence();
+ for (int i = 0; i < streamPongSize; i++) {
+ responseObserver.onNext(PongResponse.newBuilder()
+ .setLastSequence(sequence)
+ .setNumPingReceived(1)
+ .setSlotId(i)
+ .build());
+ }
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public StreamObserver<PingRequest> bidiPingPong(StreamObserver<PongResponse> responseObserver) {
+ return new StreamObserver<PingRequest>() {
+
+ int pingCount = 0;
+
+ @Override
+ public void onNext(PingRequest ping) {
+ pingCount++;
+ responseObserver.onNext(PongResponse.newBuilder()
+ .setLastSequence(ping.getSequence())
+ .setNumPingReceived(pingCount)
+ .setSlotId(0)
+ .build());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ responseObserver.onError(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+ };
+ }
+}
diff --git a/stream/proto/src/main/proto/proto2_coder_test_messages.proto b/stream/tests-common/src/main/proto/proto2_coder_test_messages.proto
similarity index 100%
rename from stream/proto/src/main/proto/proto2_coder_test_messages.proto
rename to stream/tests-common/src/main/proto/proto2_coder_test_messages.proto
diff --git a/stream/tests-common/src/main/proto/rpc.proto b/stream/tests-common/src/main/proto/rpc.proto
new file mode 100644
index 0000000..8bda369
--- /dev/null
+++ b/stream/tests-common/src/main/proto/rpc.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package bookkeeper.tests.proto.rpc;
+
+option java_multiple_files = true;
+option java_package = "org.bookkeeper.tests.proto.rpc";
+
+message PingRequest {
+ int64 sequence = 1;
+}
+
+message PongResponse {
+ int64 last_sequence = 1;
+ int32 num_ping_received = 2;
+ // the slot id in this stream of pong responses.
+ int32 slot_id = 3;
+}
+
+service PingPongService {
+
+ rpc PingPong(PingRequest) returns (PongResponse) {}
+
+ rpc LotsOfPings(stream PingRequest) returns (PongResponse) {}
+
+ rpc LotsOfPongs(PingRequest) returns (stream PongResponse) {}
+
+ rpc BidiPingPong(stream PingRequest) returns (stream PongResponse) {}
+
+}
+
+
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.