You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/09/26 18:27:10 UTC

[bookkeeper] branch master updated: [TABLE SERVICE] introduce a client interceptor to attach routing information for table service rpc calls

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ea334ea  [TABLE SERVICE] introduce a client interceptor to attach routing information for table service rpc calls
ea334ea is described below

commit ea334eab4a95654ec6a15b7c23aaf4c8c79ba11c
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Sep 26 11:27:06 2018 -0700

    [TABLE SERVICE] introduce a client interceptor to attach routing information for table service rpc calls
    
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    Currently java client deals with request routing and attach routing information before sending the request.
    If we try to apply same mechanism for non-java language clients, it requires implementing this complex
    routing logic for every non-java language client, which is not a desirable solution.
    
    So we should move the request routing to the server side, which can simplify the implementation of non-java
    language client.
    
    *Changes*
    
    Introduce a client interceptor which intercepts table service rpc calls and mutating the rpc request with
    correct routing information. This interceptor can be used at server side for proxying grpc requests.
    
    
    
    
    Author: Qi Wang <42...@users.noreply.github.com>
    Author: Sijie Guo <gu...@gmail.com>
    Author: Sijie Guo <si...@apache.org>
    Author: Charan Reddy Guttapalem <re...@gmail.com>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1708 from sijie/proxy_requests
---
 .../clients/impl/channel/StorageServerChannel.java |   8 +-
 .../RoutingHeaderClientInterceptor.java            | 273 +++++++++++++++++++++
 .../clients/impl/kv/interceptors/package-info.java |  23 ++
 .../RoutingHeaderClientInterceptorTest.java        | 270 ++++++++++++++++++++
 .../stream/protocol/ProtocolConstants.java         |   5 +
 5 files changed, 577 insertions(+), 2 deletions(-)

diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 8660aab..76c7631 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.clients.impl.channel;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.grpc.Channel;
+import io.grpc.ClientInterceptor;
 import io.grpc.ClientInterceptors;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -143,10 +144,13 @@ public class StorageServerChannel implements AutoCloseable {
      * @return an intercepted server channel.
      */
     public StorageServerChannel intercept(long scId) {
+        return intercept(new StorageContainerClientInterceptor(scId));
+    }
+
+    public StorageServerChannel intercept(ClientInterceptor... interceptors) {
         Channel interceptedChannel = ClientInterceptors.intercept(
             this.channel,
-            new StorageContainerClientInterceptor(scId));
-
+            interceptors);
         return new StorageServerChannel(
             interceptedChannel,
             this.token);
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
new file mode 100644
index 0000000..2ef970f
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv.interceptors;
+
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RANGE_ID_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROUTING_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.STREAM_ID_KEY;
+
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.google.protobuf.UnsafeByteOperations;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+
+/**
+ * A client interceptor that intercepting kv rpcs to attach routing information.
+ */
+@Slf4j
+public class RoutingHeaderClientInterceptor implements ClientInterceptor {
+
+    static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
+        RANGE_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
+        STREAM_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
+        ROUTING_KEY,
+        IdentityBinaryMarshaller.of()
+    );
+
+    /**
+     * Table request mutator that mutates a table service rpc request to attach
+     * the routing information.
+     */
+    private interface TableRequestMutator<ReqT> {
+
+        /**
+         * Mutate the provided <tt>request</tt> to attach the given routing information.
+         *
+         * @param request table request to be mutated
+         * @param sid stream id
+         * @param rid range id
+         * @param rk routing key
+         * @return the mutated request
+         */
+        ReqT intercept(ReqT request,
+                       Long sid,
+                       Long rid,
+                       byte[] rk);
+
+    }
+
+    private static RoutingHeader.Builder newRoutingHeaderBuilder(RoutingHeader header,
+                                                                 Long sid,
+                                                                 Long rid,
+                                                                 byte[] rk) {
+        return RoutingHeader.newBuilder(header)
+                .setStreamId(sid)
+                .setRangeId(rid)
+                .setRKey(UnsafeByteOperations.unsafeWrap(rk));
+    }
+
+    private static final TableRequestMutator<PutRequest> PUT_INTERCEPTOR =
+        (request, sid, rid, rk) -> PutRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+            .build();
+
+    private static final TableRequestMutator<RangeRequest> RANGE_INTERCEPTOR =
+        (request, sid, rid, rk) -> RangeRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+            .build();
+
+    private static final TableRequestMutator<DeleteRangeRequest> DELETE_INTERCEPTOR =
+        (request, sid, rid, rk) -> DeleteRangeRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+            .build();
+
+    private static final TableRequestMutator<IncrementRequest> INCR_INTERCEPTOR =
+        (request, sid, rid, rk) -> IncrementRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+            .build();
+
+    private static final TableRequestMutator<TxnRequest> TXN_INTERCEPTOR =
+        (request, sid, rid, rk) -> TxnRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, rk))
+            .build();
+
+    @Data(staticConstructor = "of")
+    private static class InterceptorDescriptor<T extends MessageLite> {
+
+        private final Class<T> clz;
+        private final Parser<T> parser;
+        private final TableRequestMutator<T> interceptor;
+
+    }
+
+    private static Map<String, InterceptorDescriptor<?>> kvRpcMethods = new HashMap<>();
+    static {
+        kvRpcMethods.put(
+            TableServiceGrpc.getPutMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                PutRequest.class, PutRequest.parser(), PUT_INTERCEPTOR
+            )
+        );
+        kvRpcMethods.put(
+            TableServiceGrpc.getRangeMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                RangeRequest.class, RangeRequest.parser(), RANGE_INTERCEPTOR
+            )
+        );
+        kvRpcMethods.put(
+            TableServiceGrpc.getDeleteMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                DeleteRangeRequest.class, DeleteRangeRequest.parser(), DELETE_INTERCEPTOR
+            )
+        );
+        kvRpcMethods.put(
+            TableServiceGrpc.getIncrementMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                IncrementRequest.class, IncrementRequest.parser(), INCR_INTERCEPTOR
+            )
+        );
+        kvRpcMethods.put(
+            TableServiceGrpc.getTxnMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                TxnRequest.class, TxnRequest.parser(), TXN_INTERCEPTOR
+            )
+        );
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+                                                               CallOptions callOptions,
+                                                               Channel next) {
+        if (log.isTraceEnabled()) {
+            log.trace("Intercepting method {}", method.getFullMethodName());
+        }
+        InterceptorDescriptor<?> descriptor = kvRpcMethods.get(method.getFullMethodName());
+        if (null != descriptor) {
+            return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+
+                private Long rid = null;
+                private Long sid = null;
+                private byte[] rk = null;
+
+                @Override
+                public void start(Listener<RespT> responseListener, Metadata headers) {
+                    // capture routing information from headers
+                    sid = headers.get(SID_METADATA_KEY);
+                    rid = headers.get(RID_METADATA_KEY);
+                    rk  = headers.get(RK_METADATA_KEY);
+                    if (log.isTraceEnabled()) {
+                        log.trace("Intercepting request with header : sid = {}, rid = {}, rk = {}",
+                            sid, rid, rk);
+                    }
+
+                    delegate().start(responseListener, headers);
+                }
+
+                @Override
+                public void sendMessage(ReqT message) {
+                    ReqT interceptedMessage;
+                    if (null == rid || null == sid || null == rk) {
+                        // we don't have enough information to form the new routing header
+                        // so do nothing
+                        interceptedMessage = message;
+                    } else {
+                        interceptedMessage = interceptMessage(
+                            method,
+                            descriptor,
+                            message,
+                            sid,
+                            rid,
+                            rk
+                        );
+                    }
+                    delegate().sendMessage(interceptedMessage);
+                }
+            };
+        } else {
+            return next.newCall(method, callOptions);
+        }
+    }
+
+    private <ReqT, TableReqT extends MessageLite> ReqT interceptMessage(
+        MethodDescriptor<ReqT, ?> method,
+        InterceptorDescriptor<TableReqT> descriptor,
+        ReqT message,
+        Long sid,
+        Long rid,
+        byte[] rk
+    ) {
+        if (null == descriptor) {
+            return message;
+        } else {
+            try {
+                return interceptTableRequest(method, descriptor, message, sid, rid, rk);
+            } catch (IOException ioe) {
+                return message;
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <ReqT, TableReqT extends MessageLite> ReqT interceptTableRequest(
+        MethodDescriptor<ReqT, ?> method,
+        InterceptorDescriptor<TableReqT> interceptor,
+        ReqT message,
+        Long sid, Long rid, byte[] rk
+    ) throws IOException {
+        // put request
+        TableReqT request;
+        if (message.getClass() == interceptor.getClz()) {
+            request = (TableReqT) message;
+        } else {
+            InputStream is = method.getRequestMarshaller().stream(message);
+            request = interceptor.getParser().parseFrom(is);
+        }
+        TableReqT interceptedMessage = interceptor.getInterceptor().intercept(
+            request, sid, rid, rk
+        );
+        if (message.getClass() == interceptor.getClz()) {
+            return (ReqT) interceptedMessage;
+        } else {
+            byte[] reqBytes = new byte[interceptedMessage.getSerializedSize()];
+            interceptedMessage.writeTo(CodedOutputStream.newInstance(reqBytes));
+            return method.getRequestMarshaller().parse(new ByteArrayInputStream(reqBytes));
+
+        }
+    }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/package-info.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/package-info.java
new file mode 100644
index 0000000..2e56f75
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Grpc Interceptors for table service.
+ */
+package org.apache.bookkeeper.clients.impl.kv.interceptors;
\ No newline at end of file
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
new file mode 100644
index 0000000..745118b
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv.interceptors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.ByteString;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RoutingHeaderClientInterceptor}.
+ */
+@Slf4j
+public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
+
+    private final long streamId = 1234L;
+    private final long rangeId = 2345L;
+    private final byte[] routingKey = ("routing-key-" + System.currentTimeMillis()).getBytes(UTF_8);
+    private final AtomicReference<Object> receivedRequest = new AtomicReference<>();
+    private StorageServerChannel channel;
+
+    @Override
+    protected void doSetup() {
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void range(RangeRequest request, StreamObserver<RangeResponse> responseObserver) {
+                log.info("Received range request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(RangeResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void delete(DeleteRangeRequest request, StreamObserver<DeleteRangeResponse> responseObserver) {
+                log.info("Received delete range request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(DeleteRangeResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void txn(TxnRequest request, StreamObserver<TxnResponse> responseObserver) {
+                log.info("Received txn request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(TxnResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void increment(IncrementRequest request, StreamObserver<IncrementResponse> responseObserver) {
+                log.info("Received incr request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(IncrementResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void put(PutRequest request, StreamObserver<PutResponse> responseObserver) {
+                log.info("Received put request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(PutResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+
+        this.channel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty()
+        ).intercept(
+            new RoutingHeaderClientInterceptor(),
+            new ClientInterceptor() {
+                @Override
+                public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+                                                                           CallOptions callOptions,
+                                                                           Channel next) {
+                    return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+                        @Override
+                        protected void checkedStart(Listener<RespT> responseListener, Metadata headers) {
+                            log.info("Intercept the request with routing information : sid = {}, rid = {}, rk = {}",
+                                streamId, rangeId, new String(routingKey, UTF_8));
+                            headers.put(
+                                RoutingHeaderClientInterceptor.RID_METADATA_KEY,
+                                rangeId
+                            );
+                            headers.put(
+                                RoutingHeaderClientInterceptor.SID_METADATA_KEY,
+                                streamId
+                            );
+                            headers.put(
+                                RoutingHeaderClientInterceptor.RK_METADATA_KEY,
+                                routingKey
+                            );
+                            delegate().start(responseListener, headers);
+                        }
+                    };
+                }
+            }
+        );
+    }
+
+    @Override
+    protected void doTeardown() {
+    }
+
+    @Test
+    public void testPutRequest() throws Exception {
+        PutRequest request = PutRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("test-key"))
+            .build();
+        PutRequest expectedRequest = PutRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        PutResponse response = this.channel.getTableService().put(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+    }
+
+    @Test
+    public void testRangeRequest() throws Exception {
+        RangeRequest request = RangeRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("test-key"))
+            .build();
+        RangeRequest expectedRequest = RangeRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        RangeResponse response = this.channel.getTableService()
+            .range(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+    }
+
+    @Test
+    public void testDeleteRangeRequest() throws Exception {
+        DeleteRangeRequest request = DeleteRangeRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("test-key"))
+            .build();
+        DeleteRangeRequest expectedRequest = DeleteRangeRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        DeleteRangeResponse response = this.channel.getTableService()
+            .delete(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+    }
+
+    @Test
+    public void testIncrementRequest() throws Exception {
+        IncrementRequest request = IncrementRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("test-key"))
+            .build();
+        IncrementRequest expectedRequest = IncrementRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        IncrementResponse response = this.channel.getTableService()
+            .increment(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+    }
+
+    @Test
+    public void testTxnRequest() throws Exception {
+        TxnRequest request = TxnRequest.newBuilder()
+            .build();
+        TxnRequest expectedRequest = TxnRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        TxnResponse response = this.channel.getTableService().txn(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), response.getHeader().getRoutingHeader());
+    }
+
+}
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 0e6900c..0088c4b 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -110,4 +110,9 @@ public final class ProtocolConstants {
 
     // storage container request metadata key
     public static final String SC_ID_KEY = "sc-id" + Metadata.BINARY_HEADER_SUFFIX;
+
+    // request metadata key for routing requests
+    public static final String ROUTING_KEY = "rk" + Metadata.BINARY_HEADER_SUFFIX;
+    public static final String STREAM_ID_KEY = "sid-" + Metadata.BINARY_HEADER_SUFFIX;
+    public static final String RANGE_ID_KEY = "rid-" + Metadata.BINARY_HEADER_SUFFIX;
 }