You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/09/26 18:27:10 UTC
[bookkeeper] branch master updated: [TABLE SERVICE] introduce a
client interceptor to attach routing information for table service rpc
calls
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ea334ea [TABLE SERVICE] introduce a client interceptor to attach routing information for table service rpc calls
ea334ea is described below
commit ea334eab4a95654ec6a15b7c23aaf4c8c79ba11c
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Sep 26 11:27:06 2018 -0700
[TABLE SERVICE] introduce a client interceptor to attach routing information for table service rpc calls
Descriptions of the changes in this PR:
*Motivation*
Currently java client deals with request routing and attach routing information before sending the request.
If we try to apply same mechanism for non-java language clients, it requires implementing this complex
routing logic for every non-java language client, which is not a desirable solution.
So we should move the request routing to the server side, which can simplify the implementation of non-java
language client.
*Changes*
Introduce a client interceptor which intercepts table service rpc calls and mutating the rpc request with
correct routing information. This interceptor can be used at server side for proxying grpc requests.
Author: Qi Wang <42...@users.noreply.github.com>
Author: Sijie Guo <gu...@gmail.com>
Author: Sijie Guo <si...@apache.org>
Author: Charan Reddy Guttapalem <re...@gmail.com>
Reviewers: Jia Zhai <None>
This closes #1708 from sijie/proxy_requests
---
.../clients/impl/channel/StorageServerChannel.java | 8 +-
.../RoutingHeaderClientInterceptor.java | 273 +++++++++++++++++++++
.../clients/impl/kv/interceptors/package-info.java | 23 ++
.../RoutingHeaderClientInterceptorTest.java | 270 ++++++++++++++++++++
.../stream/protocol/ProtocolConstants.java | 5 +
5 files changed, 577 insertions(+), 2 deletions(-)
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 8660aab..76c7631 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.clients.impl.channel;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Channel;
+import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -143,10 +144,13 @@ public class StorageServerChannel implements AutoCloseable {
* @return an intercepted server channel.
*/
public StorageServerChannel intercept(long scId) {
+ return intercept(new StorageContainerClientInterceptor(scId));
+ }
+
+ public StorageServerChannel intercept(ClientInterceptor... interceptors) {
Channel interceptedChannel = ClientInterceptors.intercept(
this.channel,
- new StorageContainerClientInterceptor(scId));
-
+ interceptors);
return new StorageServerChannel(
interceptedChannel,
this.token);
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
new file mode 100644
index 0000000..2ef970f
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv.interceptors;
+
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RANGE_ID_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROUTING_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.STREAM_ID_KEY;
+
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.google.protobuf.UnsafeByteOperations;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+
+/**
+ * A client interceptor that intercepting kv rpcs to attach routing information.
+ */
+@Slf4j
+public class RoutingHeaderClientInterceptor implements ClientInterceptor {
+
+ static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
+ RANGE_ID_KEY,
+ LongBinaryMarshaller.of()
+ );
+ static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
+ STREAM_ID_KEY,
+ LongBinaryMarshaller.of()
+ );
+ static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
+ ROUTING_KEY,
+ IdentityBinaryMarshaller.of()
+ );
+
+ /**
+ * Table request mutator that mutates a table service rpc request to attach
+ * the routing information.
+ */
+ private interface TableRequestMutator<ReqT> {
+
+ /**
+ * Mutate the provided <tt>request</tt> to attach the given routing information.
+ *
+ * @param request table request to be mutated
+ * @param sid stream id
+ * @param rid range id
+ * @param rk routing key
+ * @return the mutated request
+ */
+ ReqT intercept(ReqT request,
+ Long sid,
+ Long rid,
+ byte[] rk);
+
+ }
+
+ private static RoutingHeader.Builder newRoutingHeaderBuilder(RoutingHeader header,
+ Long sid,
+ Long rid,
+ byte[] rk) {
+ return RoutingHeader.newBuilder(header)
+ .setStreamId(sid)
+ .setRangeId(rid)
+ .setRKey(UnsafeByteOperations.unsafeWrap(rk));
+ }
+
+ private static final TableRequestMutator<PutRequest> PUT_INTERCEPTOR =
+ (request, sid, rid, rk) -> PutRequest.newBuilder(request)
+ .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+ .build();
+
+ private static final TableRequestMutator<RangeRequest> RANGE_INTERCEPTOR =
+ (request, sid, rid, rk) -> RangeRequest.newBuilder(request)
+ .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+ .build();
+
+ private static final TableRequestMutator<DeleteRangeRequest> DELETE_INTERCEPTOR =
+ (request, sid, rid, rk) -> DeleteRangeRequest.newBuilder(request)
+ .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+ .build();
+
+ private static final TableRequestMutator<IncrementRequest> INCR_INTERCEPTOR =
+ (request, sid, rid, rk) -> IncrementRequest.newBuilder(request)
+ .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+ .build();
+
+ private static final TableRequestMutator<TxnRequest> TXN_INTERCEPTOR =
+ (request, sid, rid, rk) -> TxnRequest.newBuilder(request)
+ .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+ .build();
+
+ @Data(staticConstructor = "of")
+ private static class InterceptorDescriptor<T extends MessageLite> {
+
+ private final Class<T> clz;
+ private final Parser<T> parser;
+ private final TableRequestMutator<T> interceptor;
+
+ }
+
+ private static Map<String, InterceptorDescriptor<?>> kvRpcMethods = new HashMap<>();
+ static {
+ kvRpcMethods.put(
+ TableServiceGrpc.getPutMethod().getFullMethodName(),
+ InterceptorDescriptor.of(
+ PutRequest.class, PutRequest.parser(), PUT_INTERCEPTOR
+ )
+ );
+ kvRpcMethods.put(
+ TableServiceGrpc.getRangeMethod().getFullMethodName(),
+ InterceptorDescriptor.of(
+ RangeRequest.class, RangeRequest.parser(), RANGE_INTERCEPTOR
+ )
+ );
+ kvRpcMethods.put(
+ TableServiceGrpc.getDeleteMethod().getFullMethodName(),
+ InterceptorDescriptor.of(
+ DeleteRangeRequest.class, DeleteRangeRequest.parser(), DELETE_INTERCEPTOR
+ )
+ );
+ kvRpcMethods.put(
+ TableServiceGrpc.getIncrementMethod().getFullMethodName(),
+ InterceptorDescriptor.of(
+ IncrementRequest.class, IncrementRequest.parser(), INCR_INTERCEPTOR
+ )
+ );
+ kvRpcMethods.put(
+ TableServiceGrpc.getTxnMethod().getFullMethodName(),
+ InterceptorDescriptor.of(
+ TxnRequest.class, TxnRequest.parser(), TXN_INTERCEPTOR
+ )
+ );
+ }
+
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+ CallOptions callOptions,
+ Channel next) {
+ if (log.isTraceEnabled()) {
+ log.trace("Intercepting method {}", method.getFullMethodName());
+ }
+ InterceptorDescriptor<?> descriptor = kvRpcMethods.get(method.getFullMethodName());
+ if (null != descriptor) {
+ return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+
+ private Long rid = null;
+ private Long sid = null;
+ private byte[] rk = null;
+
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata headers) {
+ // capture routing information from headers
+ sid = headers.get(SID_METADATA_KEY);
+ rid = headers.get(RID_METADATA_KEY);
+ rk = headers.get(RK_METADATA_KEY);
+ if (log.isTraceEnabled()) {
+ log.trace("Intercepting request with header : sid = {}, rid = {}, rk = {}",
+ sid, rid, rk);
+ }
+
+ delegate().start(responseListener, headers);
+ }
+
+ @Override
+ public void sendMessage(ReqT message) {
+ ReqT interceptedMessage;
+ if (null == rid || null == sid || null == rk) {
+ // we don't have enough information to form the new routing header
+ // so do nothing
+ interceptedMessage = message;
+ } else {
+ interceptedMessage = interceptMessage(
+ method,
+ descriptor,
+ message,
+ sid,
+ rid,
+ rk
+ );
+ }
+ delegate().sendMessage(interceptedMessage);
+ }
+ };
+ } else {
+ return next.newCall(method, callOptions);
+ }
+ }
+
+ private <ReqT, TableReqT extends MessageLite> ReqT interceptMessage(
+ MethodDescriptor<ReqT, ?> method,
+ InterceptorDescriptor<TableReqT> descriptor,
+ ReqT message,
+ Long sid,
+ Long rid,
+ byte[] rk
+ ) {
+ if (null == descriptor) {
+ return message;
+ } else {
+ try {
+ return interceptTableRequest(method, descriptor, message, sid, rid, rk);
+ } catch (IOException ioe) {
+ return message;
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <ReqT, TableReqT extends MessageLite> ReqT interceptTableRequest(
+ MethodDescriptor<ReqT, ?> method,
+ InterceptorDescriptor<TableReqT> interceptor,
+ ReqT message,
+ Long sid, Long rid, byte[] rk
+ ) throws IOException {
+ // put request
+ TableReqT request;
+ if (message.getClass() == interceptor.getClz()) {
+ request = (TableReqT) message;
+ } else {
+ InputStream is = method.getRequestMarshaller().stream(message);
+ request = interceptor.getParser().parseFrom(is);
+ }
+ TableReqT interceptedMessage = interceptor.getInterceptor().intercept(
+ request, sid, rid, rk
+ );
+ if (message.getClass() == interceptor.getClz()) {
+ return (ReqT) interceptedMessage;
+ } else {
+ byte[] reqBytes = new byte[interceptedMessage.getSerializedSize()];
+ interceptedMessage.writeTo(CodedOutputStream.newInstance(reqBytes));
+ return method.getRequestMarshaller().parse(new ByteArrayInputStream(reqBytes));
+
+ }
+ }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/package-info.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/package-info.java
new file mode 100644
index 0000000..2e56f75
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Grpc Interceptors for table service.
+ */
+package org.apache.bookkeeper.clients.impl.kv.interceptors;
\ No newline at end of file
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
new file mode 100644
index 0000000..745118b
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv.interceptors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.ByteString;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RoutingHeaderClientInterceptor}.
+ */
+@Slf4j
+public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
+
+ private final long streamId = 1234L;
+ private final long rangeId = 2345L;
+ private final byte[] routingKey = ("routing-key-" + System.currentTimeMillis()).getBytes(UTF_8);
+ private final AtomicReference<Object> receivedRequest = new AtomicReference<>();
+ private StorageServerChannel channel;
+
+ @Override
+ protected void doSetup() {
+ TableServiceImplBase tableService = new TableServiceImplBase() {
+
+ @Override
+ public void range(RangeRequest request, StreamObserver<RangeResponse> responseObserver) {
+ log.info("Received range request : {}", request);
+ receivedRequest.set(request);
+ responseObserver.onNext(RangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void delete(DeleteRangeRequest request, StreamObserver<DeleteRangeResponse> responseObserver) {
+ log.info("Received delete range request : {}", request);
+ receivedRequest.set(request);
+ responseObserver.onNext(DeleteRangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void txn(TxnRequest request, StreamObserver<TxnResponse> responseObserver) {
+ log.info("Received txn request : {}", request);
+ receivedRequest.set(request);
+ responseObserver.onNext(TxnResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void increment(IncrementRequest request, StreamObserver<IncrementResponse> responseObserver) {
+ log.info("Received incr request : {}", request);
+ receivedRequest.set(request);
+ responseObserver.onNext(IncrementResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void put(PutRequest request, StreamObserver<PutResponse> responseObserver) {
+ log.info("Received put request : {}", request);
+ receivedRequest.set(request);
+ responseObserver.onNext(PutResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ responseObserver.onCompleted();
+ }
+ };
+ serviceRegistry.addService(tableService.bindService());
+
+ this.channel = new StorageServerChannel(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+ Optional.empty()
+ ).intercept(
+ new RoutingHeaderClientInterceptor(),
+ new ClientInterceptor() {
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+ CallOptions callOptions,
+ Channel next) {
+ return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+ @Override
+ protected void checkedStart(Listener<RespT> responseListener, Metadata headers) {
+ log.info("Intercept the request with routing information : sid = {}, rid = {}, rk = {}",
+ streamId, rangeId, new String(routingKey, UTF_8));
+ headers.put(
+ RoutingHeaderClientInterceptor.RID_METADATA_KEY,
+ rangeId
+ );
+ headers.put(
+ RoutingHeaderClientInterceptor.SID_METADATA_KEY,
+ streamId
+ );
+ headers.put(
+ RoutingHeaderClientInterceptor.RK_METADATA_KEY,
+ routingKey
+ );
+ delegate().start(responseListener, headers);
+ }
+ };
+ }
+ }
+ );
+ }
+
+ @Override
+ protected void doTeardown() {
+ }
+
+ @Test
+ public void testPutRequest() throws Exception {
+ PutRequest request = PutRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8("test-key"))
+ .build();
+ PutRequest expectedRequest = PutRequest.newBuilder(request)
+ .setHeader(RoutingHeader.newBuilder(request.getHeader())
+ .setStreamId(streamId)
+ .setRangeId(rangeId)
+ .setRKey(ByteString.copyFrom(routingKey))
+ .build())
+ .build();
+ PutResponse response = this.channel.getTableService().put(request).get();
+
+ assertEquals(expectedRequest, receivedRequest.get());
+ assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+ }
+
+ @Test
+ public void testRangeRequest() throws Exception {
+ RangeRequest request = RangeRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8("test-key"))
+ .build();
+ RangeRequest expectedRequest = RangeRequest.newBuilder(request)
+ .setHeader(RoutingHeader.newBuilder(request.getHeader())
+ .setStreamId(streamId)
+ .setRangeId(rangeId)
+ .setRKey(ByteString.copyFrom(routingKey))
+ .build())
+ .build();
+ RangeResponse response = this.channel.getTableService()
+ .range(request).get();
+
+ assertEquals(expectedRequest, receivedRequest.get());
+ assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+ }
+
+ @Test
+ public void testDeleteRangeRequest() throws Exception {
+ DeleteRangeRequest request = DeleteRangeRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8("test-key"))
+ .build();
+ DeleteRangeRequest expectedRequest = DeleteRangeRequest.newBuilder(request)
+ .setHeader(RoutingHeader.newBuilder(request.getHeader())
+ .setStreamId(streamId)
+ .setRangeId(rangeId)
+ .setRKey(ByteString.copyFrom(routingKey))
+ .build())
+ .build();
+ DeleteRangeResponse response = this.channel.getTableService()
+ .delete(request).get();
+
+ assertEquals(expectedRequest, receivedRequest.get());
+ assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+ }
+
+ @Test
+ public void testIncrementRequest() throws Exception {
+ IncrementRequest request = IncrementRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8("test-key"))
+ .build();
+ IncrementRequest expectedRequest = IncrementRequest.newBuilder(request)
+ .setHeader(RoutingHeader.newBuilder(request.getHeader())
+ .setStreamId(streamId)
+ .setRangeId(rangeId)
+ .setRKey(ByteString.copyFrom(routingKey))
+ .build())
+ .build();
+ IncrementResponse response = this.channel.getTableService()
+ .increment(request).get();
+
+ assertEquals(expectedRequest, receivedRequest.get());
+ assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+ }
+
+ @Test
+ public void testTxnRequest() throws Exception {
+ TxnRequest request = TxnRequest.newBuilder()
+ .build();
+ TxnRequest expectedRequest = TxnRequest.newBuilder(request)
+ .setHeader(RoutingHeader.newBuilder(request.getHeader())
+ .setStreamId(streamId)
+ .setRangeId(rangeId)
+ .setRKey(ByteString.copyFrom(routingKey))
+ .build())
+ .build();
+ TxnResponse response = this.channel.getTableService().txn(request).get();
+
+ assertEquals(expectedRequest, receivedRequest.get());
+ assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+ }
+
+}
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 0e6900c..0088c4b 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -110,4 +110,9 @@ public final class ProtocolConstants {
// storage container request metadata key
public static final String SC_ID_KEY = "sc-id" + Metadata.BINARY_HEADER_SUFFIX;
+
+ // request metadata key for routing requests
+ public static final String ROUTING_KEY = "rk" + Metadata.BINARY_HEADER_SUFFIX;
+ public static final String STREAM_ID_KEY = "sid-" + Metadata.BINARY_HEADER_SUFFIX;
+ public static final String RANGE_ID_KEY = "rid-" + Metadata.BINARY_HEADER_SUFFIX;
}