You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 22:04:00 UTC

[bookkeeper] 06/09: [TABLE SERVICE] remove StorageContainerRequest and StorageContainerResponse

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 76b6857725bf25e77e4db285a27692c13c022ebb
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed May 30 10:18:15 2018 -0700

    [TABLE SERVICE] remove StorageContainerRequest and StorageContainerResponse
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
     #1448 uses reverse proxy for serving storage container requests and responses. so the `StorageContainerRequest` and `StorageContainerResponse`
     become redundant.
    
    *Solution*
    
    removes `StorageContainerRequest` and `StorageContainerResponse`
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1452 from sijie/remove_storage_container_request_response
---
 .../clients/impl/channel/StorageServerChannel.java |   4 +-
 .../clients/impl/internal/MetaRangeClientImpl.java |   6 +-
 .../internal/mr/MetaRangeRequestProcessor.java     |  46 ++--
 .../impl/internal/TestMetaRangeClientImpl.java     |  20 +-
 .../clients/impl/kv/DeleteRequestProcessor.java    |  94 ++++++++
 .../clients/impl/kv/IncrementRequestProcessor.java |  94 ++++++++
 .../apache/bookkeeper/clients/impl/kv/KvUtils.java |  47 ----
 .../clients/impl/kv/PByteBufTableRangeImpl.java    |  52 ++--
 .../clients/impl/kv/PutRequestProcessor.java       |  94 ++++++++
 .../clients/impl/kv/RangeRequestProcessor.java     |  94 ++++++++
 .../clients/impl/kv/TableRequestProcessor.java     |  93 -------
 .../clients/impl/kv/TxnRequestProcessor.java       |  94 ++++++++
 .../impl/kv/DeleteRequestProcessorTest.java        | 115 +++++++++
 .../impl/kv/IncrementRequestProcessorTest.java     | 115 +++++++++
 .../clients/impl/kv/PutRequestProcessorTest.java   | 115 +++++++++
 .../clients/impl/kv/RangeRequestProcessorTest.java | 115 +++++++++
 .../clients/impl/kv/TableRequestProcessorTest.java | 217 -----------------
 .../bookkeeper/clients/impl/kv/TestKvUtils.java    |  65 -----
 .../clients/impl/kv/TxnRequestProcessorTest.java   | 115 +++++++++
 .../stream/protocol/util/ProtoUtils.java           |  25 +-
 stream/proto/src/main/proto/kv_rpc.proto           |  20 +-
 stream/proto/src/main/proto/storage.proto          |  50 +---
 .../stream/storage/api/kv/TableStore.java          |  22 +-
 .../storage/api/metadata/MetaRangeStore.java       |   6 +-
 .../storage/impl/grpc/GrpcMetaRangeService.java    |  21 +-
 .../stream/storage/impl/grpc/GrpcTableService.java |  96 ++++++--
 .../handler/StorageContainerResponseHandler.java   |  44 ----
 .../stream/storage/impl/kv/TableStoreImpl.java     | 105 ++++----
 .../stream/storage/impl/kv/TableStoreUtils.java    |   5 +
 .../storage/impl/metadata/MetaRangeStoreImpl.java  |  35 +--
 .../impl/service/FailRequestRangeStoreService.java |  59 +++--
 .../impl/service/RangeStoreServiceImpl.java        |  57 ++---
 .../impl/TestStorageContainerStoreImpl.java        | 135 +++++------
 .../impl/grpc/TestGrpcMetaRangeService.java        |  34 +--
 .../storage/impl/grpc/TestGrpcTableService.java    | 144 ++++++-----
 .../TestStorageContainerResponseHandler.java       | 115 ---------
 .../stream/storage/impl/kv/TableStoreImplTest.java | 267 +++++++++------------
 .../impl/metadata/MetaRangeStoreImplTest.java      |  28 +--
 .../impl/service/RangeStoreServiceImplTest.java    | 155 ++++++------
 39 files changed, 1690 insertions(+), 1328 deletions(-)

diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index d9e9388..8660aab 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -31,14 +31,14 @@ import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterc
 import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceFutureStub;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceFutureStub;
 
 /**
  * A channel connected to a range server.
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
index 10481f1..7e61b58 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
@@ -78,10 +78,8 @@ class MetaRangeClientImpl implements MetaRangeClient {
     @Override
     public CompletableFuture<HashStreamRanges> getActiveDataRanges() {
         return MetaRangeRequestProcessor.of(
-            createGetActiveRangesRequest(
-                scClient.getStorageContainerId(),
-                streamProps),
-            (response) -> createActiveRanges(response.getGetActiveRangesResp()),
+            createGetActiveRangesRequest(streamProps),
+            (response) -> createActiveRanges(response),
             scClient,
             executor,
             backoffPolicy
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
index cb25e84..39e9a5a 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
@@ -21,37 +21,36 @@ package org.apache.bookkeeper.clients.impl.internal.mr;
 import static org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils.createMetaRangeException;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
 import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
 import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 
 /**
  * Request Processor processing meta range request.
  */
 public class MetaRangeRequestProcessor<RespT>
-    extends ListenableFutureRpcProcessor<StorageContainerRequest, StorageContainerResponse, RespT> {
+    extends ListenableFutureRpcProcessor<GetActiveRangesRequest, GetActiveRangesResponse, RespT> {
 
     public static <T> MetaRangeRequestProcessor<T> of(
-        StorageContainerRequest request,
-        Function<StorageContainerResponse, T> responseFunc,
+        GetActiveRangesRequest request,
+        Function<GetActiveRangesResponse, T> responseFunc,
         StorageContainerChannel channel,
         ScheduledExecutorService executor,
         Backoff.Policy backoffPolicy) {
         return new MetaRangeRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
     }
 
-    private final StorageContainerRequest request;
-    private final Function<StorageContainerResponse, RespT> responseFunc;
+    private final GetActiveRangesRequest request;
+    private final Function<GetActiveRangesResponse, RespT> responseFunc;
 
-    private MetaRangeRequestProcessor(StorageContainerRequest request,
-                                      Function<StorageContainerResponse, RespT> responseFunc,
+    private MetaRangeRequestProcessor(GetActiveRangesRequest request,
+                                      Function<GetActiveRangesResponse, RespT> responseFunc,
                                       StorageContainerChannel channel,
                                       ScheduledExecutorService executor,
                                       Backoff.Policy backoffPolicy) {
@@ -61,34 +60,23 @@ public class MetaRangeRequestProcessor<RespT>
     }
 
     @Override
-    protected StorageContainerRequest createRequest() {
+    protected GetActiveRangesRequest createRequest() {
         return request;
     }
 
     @Override
-    protected ListenableFuture<StorageContainerResponse> sendRPC(StorageServerChannel rsChannel,
-                                                                 StorageContainerRequest request) {
-        switch (request.getRequestCase()) {
-            case GET_ACTIVE_RANGES_REQ:
-                return rsChannel.getMetaRangeService().getActiveRanges(request);
-            default:
-                SettableFuture<StorageContainerResponse> respFuture = SettableFuture.create();
-                respFuture.setException(new Exception("Unknown request " + request));
-                return respFuture;
-        }
+    protected ListenableFuture<GetActiveRangesResponse> sendRPC(StorageServerChannel rsChannel,
+                                                                GetActiveRangesRequest request) {
+        return rsChannel.getMetaRangeService().getActiveRanges(request);
     }
 
-    private String getIdentifier(StorageContainerRequest request) {
-        switch (request.getRequestCase()) {
-            case GET_ACTIVE_RANGES_REQ:
-                return "" + request.getGetActiveRangesReq().getStreamId();
-            default:
-                return "";
-        }
+    private String getIdentifier(GetActiveRangesRequest request) {
+
+        return "" + request.getStreamId();
     }
 
     @Override
-    protected RespT processResponse(StorageContainerResponse response) throws Exception {
+    protected RespT processResponse(GetActiveRangesResponse response) throws Exception {
         if (StatusCode.SUCCESS == response.getCode()) {
             return responseFunc.apply(response);
         }
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
index 189b849..9716162 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
@@ -45,13 +45,12 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.RangeProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
 import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
 import org.apache.bookkeeper.stream.proto.storage.RelationType;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.junit.Test;
 
 /**
@@ -129,21 +128,18 @@ public class TestMetaRangeClientImpl extends GrpcClientTestBase {
 
         // create response
         GetActiveRangesResponse getActiveRangesResponse = GetActiveRangesResponse.newBuilder()
+            .setCode(StatusCode.SUCCESS)
             .addRanges(
                 buildRelatedRange(Long.MIN_VALUE, 0L, 123L, 1L, Lists.newArrayList(113L))
             ).addRanges(
                 buildRelatedRange(0L, Long.MAX_VALUE, 124L, 2L, Lists.newArrayList(114L))
             ).build();
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .setGetActiveRangesResp(getActiveRangesResponse)
-            .build();
 
         MetaRangeServiceImplBase metaRangeService = new MetaRangeServiceImplBase() {
             @Override
-            public void getActiveRanges(StorageContainerRequest request,
-                                        StreamObserver<StorageContainerResponse> responseObserver) {
-                responseObserver.onNext(response);
+            public void getActiveRanges(GetActiveRangesRequest request,
+                                        StreamObserver<GetActiveRangesResponse> responseObserver) {
+                responseObserver.onNext(getActiveRangesResponse);
                 responseObserver.onCompleted();
             }
         };
@@ -154,7 +150,7 @@ public class TestMetaRangeClientImpl extends GrpcClientTestBase {
             Optional.empty());
         serviceFuture.complete(rsChannel);
 
-        HashStreamRanges expectedStream = createActiveRanges(response.getGetActiveRangesResp());
+        HashStreamRanges expectedStream = createActiveRanges(getActiveRangesResponse);
         CompletableFuture<HashStreamRanges> getFuture = metaRangeClient.getActiveDataRanges();
         assertEquals(expectedStream, getFuture.get());
     }
@@ -166,8 +162,8 @@ public class TestMetaRangeClientImpl extends GrpcClientTestBase {
 
         MetaRangeServiceImplBase metaRangeService = new MetaRangeServiceImplBase() {
             @Override
-            public void getActiveRanges(StorageContainerRequest request,
-                                        StreamObserver<StorageContainerResponse> responseObserver) {
+            public void getActiveRanges(GetActiveRangesRequest request,
+                                        StreamObserver<GetActiveRangesResponse> responseObserver) {
                 responseObserver.onError(new StatusRuntimeException(Status.INTERNAL));
             }
         };
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessor.java
new file mode 100644
index 0000000..88287df
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class DeleteRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<DeleteRangeRequest, DeleteRangeResponse, RespT> {
+
+    public static <T> DeleteRequestProcessor<T> of(
+        DeleteRangeRequest request,
+        Function<DeleteRangeResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new DeleteRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final DeleteRangeRequest request;
+    private final Function<DeleteRangeResponse, RespT> responseFunc;
+
+    private DeleteRequestProcessor(DeleteRangeRequest request,
+                                   Function<DeleteRangeResponse, RespT> respFunc,
+                                   StorageContainerChannel channel,
+                                   ScheduledExecutorService executor,
+                                   Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected DeleteRangeRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<DeleteRangeResponse> sendRPC(StorageServerChannel rsChannel,
+                                                            DeleteRangeRequest request) {
+        return rsChannel.getTableService().delete(request);
+    }
+
+    @Override
+    protected RespT processResponse(DeleteRangeResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessor.java
new file mode 100644
index 0000000..f41c97b
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class IncrementRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<IncrementRequest, IncrementResponse, RespT> {
+
+    public static <T> IncrementRequestProcessor<T> of(
+        IncrementRequest request,
+        Function<IncrementResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new IncrementRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final IncrementRequest request;
+    private final Function<IncrementResponse, RespT> responseFunc;
+
+    private IncrementRequestProcessor(IncrementRequest request,
+                                      Function<IncrementResponse, RespT> respFunc,
+                                      StorageContainerChannel channel,
+                                      ScheduledExecutorService executor,
+                                      Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected IncrementRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<IncrementResponse> sendRPC(StorageServerChannel rsChannel,
+                                                          IncrementRequest request) {
+        return rsChannel.getTableService().increment(request);
+    }
+
+    @Override
+    protected RespT processResponse(IncrementResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java
index 2fd27ce..279219b 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java
@@ -52,9 +52,7 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 
 /**
  * K/V related utils.
@@ -86,15 +84,6 @@ public final class KvUtils {
         return Lists.transform(kvs, kv -> fromProtoKeyValue(kv, kvFactory));
     }
 
-    public static StorageContainerRequest newKvRangeRequest(
-        long scId,
-        RangeRequest.Builder rangeReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvRangeReq(rangeReq)
-            .build();
-    }
-
     public static RangeRequest.Builder newRangeRequest(ByteBuf key, RangeOption<ByteBuf> option) {
         RangeRequest.Builder builder = RangeRequest.newBuilder()
             .setKey(toProtoKey(key))
@@ -121,15 +110,6 @@ public final class KvUtils {
             .kvs(fromProtoKeyValues(response.getKvsList(), kvFactory));
     }
 
-    public static StorageContainerRequest newKvPutRequest(
-        long scId,
-        PutRequest.Builder putReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvPutReq(putReq)
-            .build();
-    }
-
     public static PutRequest.Builder newPutRequest(ByteBuf key,
                                                    ByteBuf value,
                                                    PutOption<ByteBuf> option) {
@@ -150,15 +130,6 @@ public final class KvUtils {
         return result;
     }
 
-    public static StorageContainerRequest newKvIncrementRequest(
-        long scId,
-        IncrementRequest.Builder putReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvIncrReq(putReq)
-            .build();
-    }
-
     public static IncrementRequest.Builder newIncrementRequest(ByteBuf key,
                                                                long amount,
                                                                IncrementOption<ByteBuf> option) {
@@ -177,15 +148,6 @@ public final class KvUtils {
         return result;
     }
 
-    public static StorageContainerRequest newKvDeleteRequest(
-        long scId,
-        DeleteRangeRequest.Builder deleteReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvDeleteReq(deleteReq)
-            .build();
-    }
-
     public static DeleteRangeRequest.Builder newDeleteRequest(ByteBuf key, DeleteOption<ByteBuf> option) {
         DeleteRangeRequest.Builder builder = DeleteRangeRequest.newBuilder()
             .setKey(UnsafeByteOperations.unsafeWrap(key.nioBuffer()))
@@ -310,15 +272,6 @@ public final class KvUtils {
         return reqBuilder;
     }
 
-    public static StorageContainerRequest newKvTxnRequest(
-        long scId,
-        TxnRequest.Builder txnReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvTxnReq(txnReq)
-            .build();
-    }
-
     public static TxnResult<ByteBuf, ByteBuf> newKvTxnResult(
         TxnResponse txnResponse,
         ResultFactory<ByteBuf, ByteBuf> resultFactory,
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
index 03c64f7..1f07b80 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
@@ -94,12 +94,11 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
         if (null != option.endKey()) {
             option.endKey().retain();
         }
-        return TableRequestProcessor.of(
-            KvUtils.newKvRangeRequest(
-                scChannel.getStorageContainerId(),
-                KvUtils.newRangeRequest(lKey, option)
-                    .setHeader(newRoutingHeader(pKey))),
-            response -> KvUtils.newRangeResult(response.getKvRangeResp(), resultFactory, kvFactory),
+        return RangeRequestProcessor.of(
+            KvUtils.newRangeRequest(lKey, option)
+                .setHeader(newRoutingHeader(pKey))
+                .build(),
+            response -> KvUtils.newRangeResult(response, resultFactory, kvFactory),
             scChannel,
             executor,
             backoffPolicy
@@ -120,12 +119,11 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
         pKey.retain();
         lKey.retain();
         value.retain();
-        return TableRequestProcessor.of(
-            KvUtils.newKvPutRequest(
-                scChannel.getStorageContainerId(),
-                KvUtils.newPutRequest(lKey, value, option)
-                    .setHeader(newRoutingHeader(pKey))),
-            response -> KvUtils.newPutResult(response.getKvPutResp(), resultFactory, kvFactory),
+        return PutRequestProcessor.of(
+            KvUtils.newPutRequest(lKey, value, option)
+                .setHeader(newRoutingHeader(pKey))
+                .build(),
+            response -> KvUtils.newPutResult(response, resultFactory, kvFactory),
             scChannel,
             executor,
             backoffPolicy
@@ -145,12 +143,11 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
         if (null != option.endKey()) {
             option.endKey().retain();
         }
-        return TableRequestProcessor.of(
-            KvUtils.newKvDeleteRequest(
-                scChannel.getStorageContainerId(),
-                KvUtils.newDeleteRequest(lKey, option)
-                    .setHeader(newRoutingHeader(pKey))),
-            response -> KvUtils.newDeleteResult(response.getKvDeleteResp(), resultFactory, kvFactory),
+        return DeleteRequestProcessor.of(
+            KvUtils.newDeleteRequest(lKey, option)
+                .setHeader(newRoutingHeader(pKey))
+                .build(),
+            response -> KvUtils.newDeleteResult(response, resultFactory, kvFactory),
             scChannel,
             executor,
             backoffPolicy
@@ -170,12 +167,11 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                                                                           IncrementOption<ByteBuf> option) {
         pKey.retain();
         lKey.retain();
-        return TableRequestProcessor.of(
-            KvUtils.newKvIncrementRequest(
-                scChannel.getStorageContainerId(),
-                KvUtils.newIncrementRequest(lKey, amount, option)
-                    .setHeader(newRoutingHeader(pKey))),
-            response -> KvUtils.newIncrementResult(response.getKvIncrResp(), resultFactory, kvFactory),
+        return IncrementRequestProcessor.of(
+            KvUtils.newIncrementRequest(lKey, amount, option)
+                .setHeader(newRoutingHeader(pKey))
+                .build(),
+            response -> KvUtils.newIncrementResult(response, resultFactory, kvFactory),
             scChannel,
             executor,
             backoffPolicy
@@ -248,11 +244,9 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
 
         @Override
         public CompletableFuture<TxnResult<ByteBuf, ByteBuf>> commit() {
-            return TableRequestProcessor.of(
-                KvUtils.newKvTxnRequest(
-                    scChannel.getStorageContainerId(),
-                    txnBuilder.setHeader(newRoutingHeader(pKey))),
-                response -> KvUtils.newKvTxnResult(response.getKvTxnResp(), resultFactory, kvFactory),
+            return TxnRequestProcessor.of(
+                txnBuilder.setHeader(newRoutingHeader(pKey)).build(),
+                response -> KvUtils.newKvTxnResult(response, resultFactory, kvFactory),
                 scChannel,
                 executor,
                 backoffPolicy
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessor.java
new file mode 100644
index 0000000..38ae6fc
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class PutRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<PutRequest, PutResponse, RespT> {
+
+    public static <T> PutRequestProcessor<T> of(
+        PutRequest request,
+        Function<PutResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new PutRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final PutRequest request;
+    private final Function<PutResponse, RespT> responseFunc;
+
+    private PutRequestProcessor(PutRequest request,
+                                Function<PutResponse, RespT> respFunc,
+                                StorageContainerChannel channel,
+                                ScheduledExecutorService executor,
+                                Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected PutRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<PutResponse> sendRPC(StorageServerChannel rsChannel,
+                                                    PutRequest request) {
+        return rsChannel.getTableService().put(request);
+    }
+
+    @Override
+    protected RespT processResponse(PutResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessor.java
new file mode 100644
index 0000000..de2ac87
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class RangeRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<RangeRequest, RangeResponse, RespT> {
+
+    public static <T> RangeRequestProcessor<T> of(
+        RangeRequest request,
+        Function<RangeResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new RangeRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final RangeRequest request;
+    private final Function<RangeResponse, RespT> responseFunc;
+
+    private RangeRequestProcessor(RangeRequest request,
+                                  Function<RangeResponse, RespT> respFunc,
+                                  StorageContainerChannel channel,
+                                  ScheduledExecutorService executor,
+                                  Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected RangeRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<RangeResponse> sendRPC(StorageServerChannel rsChannel,
+                                                      RangeRequest request) {
+        return rsChannel.getTableService().range(request);
+    }
+
+    @Override
+    protected RespT processResponse(RangeResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
deleted file mode 100644
index f5e0bc5..0000000
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.clients.impl.kv;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Function;
-import java.util.stream.Stream;
-import org.apache.bookkeeper.clients.exceptions.InternalServerException;
-import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
-import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
-import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
-import org.apache.bookkeeper.common.util.Backoff.Policy;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-
-/**
- * Request Processor processing table request.
- */
-public class TableRequestProcessor<RespT>
-    extends ListenableFutureRpcProcessor<StorageContainerRequest, StorageContainerResponse, RespT> {
-
-    public static <T> TableRequestProcessor<T> of(
-        StorageContainerRequest request,
-        Function<StorageContainerResponse, T> responseFunc,
-        StorageContainerChannel channel,
-        ScheduledExecutorService executor,
-        Policy backoffPolicy) {
-        return new TableRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
-    }
-
-    private final StorageContainerRequest request;
-    private final Function<StorageContainerResponse, RespT> responseFunc;
-
-    private TableRequestProcessor(StorageContainerRequest request,
-                                  Function<StorageContainerResponse, RespT> respFunc,
-                                  StorageContainerChannel channel,
-                                  ScheduledExecutorService executor,
-                                  Policy backoffPolicy) {
-        super(channel, executor, backoffPolicy);
-        this.request = request;
-        this.responseFunc = respFunc;
-    }
-
-    @Override
-    protected StorageContainerRequest createRequest() {
-        return request;
-    }
-
-    @Override
-    protected ListenableFuture<StorageContainerResponse> sendRPC(StorageServerChannel rsChannel,
-                                                                 StorageContainerRequest request) {
-        switch (request.getRequestCase()) {
-            case KV_RANGE_REQ:
-                return rsChannel.getTableService().range(request);
-            case KV_PUT_REQ:
-                return rsChannel.getTableService().put(request);
-            case KV_DELETE_REQ:
-                return rsChannel.getTableService().delete(request);
-            case KV_INCR_REQ:
-                return rsChannel.getTableService().increment(request);
-            case KV_TXN_REQ:
-                return rsChannel.getTableService().txn(request);
-            default:
-                SettableFuture<StorageContainerResponse> respFuture = SettableFuture.create();
-                respFuture.setException(new Exception("Unknown request " + request));
-                return respFuture;
-        }
-    }
-
-    @Override
-    protected RespT processResponse(StorageContainerResponse response) throws Exception {
-        if (StatusCode.SUCCESS == response.getCode()) {
-            return responseFunc.apply(response);
-        }
-        throw new InternalServerException("Encountered internal server exception : code = "
-            + response.getCode());
-    }
-}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessor.java
new file mode 100644
index 0000000..a3ed209
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class TxnRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<TxnRequest, TxnResponse, RespT> {
+
+    public static <T> TxnRequestProcessor<T> of(
+        TxnRequest request,
+        Function<TxnResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new TxnRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final TxnRequest request;
+    private final Function<TxnResponse, RespT> responseFunc;
+
+    private TxnRequestProcessor(TxnRequest request,
+                                Function<TxnResponse, RespT> respFunc,
+                                StorageContainerChannel channel,
+                                ScheduledExecutorService executor,
+                                Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected TxnRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<TxnResponse> sendRPC(StorageServerChannel rsChannel,
+                                                    TxnRequest request) {
+        return rsChannel.getTableService().txn(request);
+    }
+
+    @Override
+    protected RespT processResponse(TxnResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessorTest.java
new file mode 100644
index 0000000..8b01b9b
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link DeleteRequestProcessor}.
+ */
+public class DeleteRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected DeleteRangeResponse newSuccessResponse() {
+        return DeleteRangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected DeleteRangeRequest newRequest() {
+        return DeleteRangeRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        DeleteRangeResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void delete(DeleteRangeRequest request,
+                               StreamObserver<DeleteRangeResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<DeleteRangeResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        DeleteRangeRequest request = newRequest();
+
+        DeleteRequestProcessor<String> processor = DeleteRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessorTest.java
new file mode 100644
index 0000000..1b1ec49
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link IncrementRequestProcessor}.
+ */
+public class IncrementRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected IncrementResponse newSuccessResponse() {
+        return IncrementResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected IncrementRequest newRequest() {
+        return IncrementRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        IncrementResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void increment(IncrementRequest request,
+                                  StreamObserver<IncrementResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<IncrementResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        IncrementRequest request = newRequest();
+
+        IncrementRequestProcessor<String> processor = IncrementRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessorTest.java
new file mode 100644
index 0000000..3481a26
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link PutRequestProcessor}.
+ */
+public class PutRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected PutResponse newSuccessResponse() {
+        return PutResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected PutRequest newRequest() {
+        return PutRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        PutResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void put(PutRequest request,
+                            StreamObserver<PutResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<PutResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        PutRequest request = newRequest();
+
+        PutRequestProcessor<String> processor = PutRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessorTest.java
new file mode 100644
index 0000000..9e53df7
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link RangeRequestProcessor}.
+ */
+public class RangeRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected RangeResponse newSuccessResponse() {
+        return RangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected RangeRequest newRequest() {
+        return RangeRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        RangeResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void range(RangeRequest request,
+                              StreamObserver<RangeResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<RangeResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        RangeRequest request = newRequest();
+
+        RangeRequestProcessor<String> processor = RangeRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
deleted file mode 100644
index bec76e5..0000000
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.clients.impl.kv;
-
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.stub.StreamObserver;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-import lombok.Cleanup;
-import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
-import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
-import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
-import org.apache.bookkeeper.clients.utils.ClientConstants;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceImplBase;
-import org.junit.Test;
-
-/**
- * Unit test of {@link TableRequestProcessor}.
- */
-public class TableRequestProcessorTest extends GrpcClientTestBase {
-
-    @Override
-    protected void doSetup() throws Exception {
-    }
-
-    @Override
-    protected void doTeardown() throws Exception {
-    }
-
-    @Test
-    public void testProcessRangeRequest() throws Exception {
-        testProcess(KV_RANGE_REQ);
-    }
-
-    @Test
-    public void testProcessPutRequest() throws Exception {
-        testProcess(KV_PUT_REQ);
-    }
-
-    @Test
-    public void testProcessDeleteRequest() throws Exception {
-        testProcess(KV_DELETE_REQ);
-    }
-
-    @Test
-    public void testProcessIncrementRequest() throws Exception {
-        testProcess(KV_INCR_REQ);
-    }
-
-    @Test
-    public void testProcessTxnRequest() throws Exception {
-        testProcess(KV_TXN_REQ);
-    }
-
-    private void testProcess(RequestCase type) throws Exception {
-        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
-
-        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
-
-        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
-        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
-
-        StorageContainerResponse.Builder respBuilder = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS);
-
-        switch (type) {
-            case KV_PUT_REQ:
-                respBuilder.setKvPutResp(PutResponse.newBuilder().build());
-                break;
-            case KV_DELETE_REQ:
-                respBuilder.setKvDeleteResp(DeleteRangeResponse.newBuilder().build());
-                break;
-            case KV_RANGE_REQ:
-                respBuilder.setKvRangeResp(RangeResponse.newBuilder().build());
-                break;
-            case KV_INCR_REQ:
-                respBuilder.setKvIncrResp(IncrementResponse.newBuilder().build());
-                break;
-            case KV_TXN_REQ:
-                respBuilder.setKvTxnResp(TxnResponse.newBuilder().build());
-                break;
-            default:
-                break;
-        }
-        StorageContainerResponse response = respBuilder.build();
-
-        AtomicReference<StorageContainerRequest> receivedRequest = new AtomicReference<>(null);
-        AtomicReference<RequestCase> receivedRequestType = new AtomicReference<>(null);
-        TableServiceImplBase tableService = new TableServiceImplBase() {
-            @Override
-            public void range(StorageContainerRequest request,
-                              StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_RANGE_REQ);
-                complete(responseObserver);
-            }
-
-            @Override
-            public void put(StorageContainerRequest request,
-                            StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_PUT_REQ);
-                complete(responseObserver);
-            }
-
-            @Override
-            public void delete(StorageContainerRequest request,
-                               StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_DELETE_REQ);
-                complete(responseObserver);
-            }
-
-            @Override
-            public void txn(StorageContainerRequest request,
-                            StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_TXN_REQ);
-                complete(responseObserver);
-            }
-
-            @Override
-            public void increment(StorageContainerRequest request,
-                                  StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_INCR_REQ);
-                complete(responseObserver);
-            }
-
-            private void complete(StreamObserver<StorageContainerResponse> responseStreamObserver) {
-                responseStreamObserver.onNext(response);
-                responseStreamObserver.onCompleted();
-            }
-        };
-        serviceRegistry.addService(tableService.bindService());
-        StorageServerChannel ssChannel = new StorageServerChannel(
-            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
-            Optional.empty());
-        serverChannelFuture.complete(ssChannel);
-
-        StorageContainerRequest.Builder requestBuilder = StorageContainerRequest.newBuilder();
-        switch (type) {
-            case KV_PUT_REQ:
-                requestBuilder.setKvPutReq(PutRequest.newBuilder().build());
-                break;
-            case KV_DELETE_REQ:
-                requestBuilder.setKvDeleteReq(DeleteRangeRequest.newBuilder().build());
-                break;
-            case KV_RANGE_REQ:
-                requestBuilder.setKvRangeReq(RangeRequest.newBuilder().build());
-                break;
-            case KV_INCR_REQ:
-                requestBuilder.setKvIncrReq(IncrementRequest.newBuilder().build());
-                break;
-            case KV_TXN_REQ:
-                requestBuilder.setKvTxnReq(TxnRequest.newBuilder().build());
-                break;
-            default:
-                break;
-        }
-        StorageContainerRequest request = requestBuilder.build();
-
-        TableRequestProcessor<String> processor = TableRequestProcessor.of(
-            request,
-            resp -> "test",
-            scChannel,
-            scheduler,
-            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
-        assertEquals("test", FutureUtils.result(processor.process()));
-        assertSame(request, receivedRequest.get());
-        assertEquals(type, receivedRequestType.get());
-    }
-
-}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java
index 3cff0a6..d2df334 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java
@@ -17,16 +17,8 @@ package org.apache.bookkeeper.clients.impl.kv;
 import static com.google.common.base.Charsets.UTF_8;
 import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newDeleteRequest;
 import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newIncrementRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvDeleteRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvIncrementRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvPutRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvRangeRequest;
 import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newPutRequest;
 import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newRangeRequest;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -45,7 +37,6 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.junit.Test;
 
 /**
@@ -89,26 +80,6 @@ public class TestKvUtils {
     }
 
     @Test
-    public void testNewKvRangeRequest() {
-        try (RangeOption<ByteBuf> rangeOption = optionFactory.newRangeOption()
-            .endKey(key.retainedDuplicate())
-            .countOnly(true)
-            .keysOnly(true)
-            .limit(10)
-            .maxCreateRev(1234L)
-            .minCreateRev(234L)
-            .maxModRev(2345L)
-            .minModRev(1235L)
-            .build()) {
-            RangeRequest.Builder rrBuilder = newRangeRequest(key, rangeOption);
-            StorageContainerRequest request = newKvRangeRequest(scId, rrBuilder);
-            assertEquals(scId, request.getScId());
-            assertEquals(KV_RANGE_REQ, request.getRequestCase());
-            assertEquals(rrBuilder.build(), request.getKvRangeReq());
-        }
-    }
-
-    @Test
     public void testNewPutRequest() {
         try (PutOption<ByteBuf> option = Options.putAndGet()) {
             PutRequest rr = newPutRequest(key, value, option).build();
@@ -120,17 +91,6 @@ public class TestKvUtils {
     }
 
     @Test
-    public void testNewKvPutRequest() {
-        try (PutOption<ByteBuf> option = Options.putAndGet()) {
-            PutRequest.Builder putBuilder = newPutRequest(key, value, option);
-            StorageContainerRequest request = newKvPutRequest(scId, putBuilder);
-            assertEquals(scId, request.getScId());
-            assertEquals(KV_PUT_REQ, request.getRequestCase());
-            assertEquals(putBuilder.build(), request.getKvPutReq());
-        }
-    }
-
-    @Test
     public void testNewIncrementRequest() {
         try (IncrementOption<ByteBuf> option = Options.incrementAndGet()) {
             IncrementRequest rr = newIncrementRequest(key, 100L, option).build();
@@ -142,17 +102,6 @@ public class TestKvUtils {
     }
 
     @Test
-    public void testNewKvIncrementRequest() {
-        try (IncrementOption<ByteBuf> option = Options.incrementAndGet()) {
-            IncrementRequest.Builder incrBuilder = newIncrementRequest(key, 100L, option);
-            StorageContainerRequest request = newKvIncrementRequest(scId, incrBuilder);
-            assertEquals(scId, request.getScId());
-            assertEquals(KV_INCR_REQ, request.getRequestCase());
-            assertEquals(incrBuilder.build(), request.getKvIncrReq());
-        }
-    }
-
-    @Test
     public void testNewDeleteRequest() {
         try (DeleteOption<ByteBuf> option = optionFactory.newDeleteOption()
             .endKey(key.retainedDuplicate())
@@ -166,18 +115,4 @@ public class TestKvUtils {
         }
     }
 
-    @Test
-    public void testNewKvDeleteRequest() {
-        try (DeleteOption<ByteBuf> option = optionFactory.newDeleteOption()
-            .endKey(key.retainedDuplicate())
-            .prevKv(true)
-            .build()) {
-            DeleteRangeRequest.Builder delBuilder = newDeleteRequest(key, option);
-            StorageContainerRequest request = newKvDeleteRequest(scId, delBuilder);
-            assertEquals(scId, request.getScId());
-            assertEquals(KV_DELETE_REQ, request.getRequestCase());
-            assertEquals(delBuilder.build(), request.getKvDeleteReq());
-        }
-    }
-
 }
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessorTest.java
new file mode 100644
index 0000000..2efecd0
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link TxnRequestProcessor}.
+ */
+public class TxnRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected TxnResponse newSuccessResponse() {
+        return TxnResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected TxnRequest newRequest() {
+        return TxnRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        TxnResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void txn(TxnRequest request,
+                            StreamObserver<TxnResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<TxnResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        TxnRequest request = newRequest();
+
+        TxnRequestProcessor<String> processor = TxnRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java
index 35ed903..687388f 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java
@@ -42,7 +42,6 @@ import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointReq
 import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -224,25 +223,17 @@ public class ProtoUtils {
     // Meta Range API
     //
 
-    private static StorageContainerRequest.Builder newScRequestBuilder(long scId) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId);
-    }
-
-    public static StorageContainerRequest createGetActiveRangesRequest(long scId, long streamId) {
-        return newScRequestBuilder(scId)
-            .setGetActiveRangesReq(GetActiveRangesRequest.newBuilder()
-                .setStreamId(streamId))
-            .build();
+    public static GetActiveRangesRequest createGetActiveRangesRequest(long streamId) {
+        return GetActiveRangesRequest.newBuilder()
+                .setStreamId(streamId)
+                .build();
     }
 
-    public static StorageContainerRequest createGetActiveRangesRequest(long scId,
-                                                                       StreamProperties streamProps) {
-        return newScRequestBuilder(scId)
-            .setGetActiveRangesReq(GetActiveRangesRequest.newBuilder()
+    public static GetActiveRangesRequest createGetActiveRangesRequest(StreamProperties streamProps) {
+        return GetActiveRangesRequest.newBuilder()
                 .setStreamId(streamProps.getStreamId())
-                .setStreamProps(streamProps))
-            .build();
+                .setStreamProps(streamProps)
+                .build();
     }
 
     //
diff --git a/stream/proto/src/main/proto/kv_rpc.proto b/stream/proto/src/main/proto/kv_rpc.proto
index c324bcb..01e5cb6 100644
--- a/stream/proto/src/main/proto/kv_rpc.proto
+++ b/stream/proto/src/main/proto/kv_rpc.proto
@@ -18,6 +18,7 @@
 syntax = "proto3";
 
 import "kv.proto";
+import "storage.proto";
 
 package bookkeeper.proto.kv.rpc;
 
@@ -34,7 +35,7 @@ message RoutingHeader {
   bytes r_key           = 3;
 }
 
-service KV {
+service TableService {
   // Range gets the keys in the range from the key-value store.
   // NOT supported yet.
   rpc Range(RangeRequest) returns (RangeResponse) {}
@@ -47,7 +48,7 @@ service KV {
   // DeleteRange deletes the given range from the key-value store.
   // A delete request increments the revision of the key-value store
   // and generates a delete event in the event history for every deleted key.
-  rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {}
+  rpc Delete(DeleteRangeRequest) returns (DeleteRangeResponse) {}
 
   // Txn processes multiple requests in a single transaction.
   // A txn request increments the revision of the key-value store
@@ -55,20 +56,17 @@ service KV {
   // It is not allowed to modify the same key several times within one txn.
   rpc Txn(TxnRequest) returns (TxnResponse) {}
 
+  // Increment increments the amount associated with the keys
+  rpc Increment(IncrementRequest) returns (IncrementResponse) {}
+
 }
 
 message ResponseHeader {
-  // cluster_id is the ID of the cluster which sent the response.
-  uint64 cluster_id = 1;
-  // member_id is the ID of the member which sent the response.
-  uint64 member_id = 2;
-  // revision is the key-value store revision when the request was applied.
-  int64 revision = 3;
-  // raft_term is the raft term when the request was applied.
-  uint64 raft_term = 4;
+  // Status Code
+  storage.StatusCode code               = 1;
 
   // routing header
-  RoutingHeader routing_header = 99;
+  RoutingHeader routing_header  = 99;
 }
 
 message RangeRequest {
diff --git a/stream/proto/src/main/proto/storage.proto b/stream/proto/src/main/proto/storage.proto
index b84ec2e..7570d29 100644
--- a/stream/proto/src/main/proto/storage.proto
+++ b/stream/proto/src/main/proto/storage.proto
@@ -19,7 +19,6 @@ syntax = "proto3";
 
 import "common.proto";
 import "stream.proto";
-import "kv_rpc.proto";
 
 package bookkeeper.proto.storage;
 
@@ -87,7 +86,8 @@ message GetActiveRangesRequest {
 }
 
 message GetActiveRangesResponse {
-    repeated RelatedRanges ranges       = 1;
+    StatusCode code                     = 1;
+    repeated RelatedRanges ranges       = 2;
 }
 
 enum RelationType {
@@ -104,7 +104,7 @@ message RelatedRanges {
 
 // public service for other operations in range server
 service MetaRangeService {
-    rpc GetActiveRanges(StorageContainerRequest)        returns (StorageContainerResponse);
+    rpc GetActiveRanges(GetActiveRangesRequest)        returns (GetActiveRangesResponse);
 }
 
 //
@@ -188,18 +188,6 @@ service RootRangeService {
 }
 
 //
-// KV Service
-//
-
-service TableService {
-    rpc Range(StorageContainerRequest) returns (StorageContainerResponse) {}
-    rpc Put(StorageContainerRequest) returns (StorageContainerResponse) {}
-    rpc Delete(StorageContainerRequest) returns (StorageContainerResponse) {}
-    rpc Increment(StorageContainerRequest) returns (StorageContainerResponse) {}
-    rpc Txn(StorageContainerRequest) returns (StorageContainerResponse) {}
-}
-
-//
 // StorageContainerService
 //
 
@@ -234,35 +222,3 @@ service StorageContainerService {
     // Get the storage container endpoints
     rpc GetStorageContainerEndpoint(GetStorageContainerEndpointRequest) returns (GetStorageContainerEndpointResponse);
 }
-
-message StorageContainerRequest {
-    int64 sc_id                 = 1;
-
-    oneof request {
-        // stream metadata operations
-        GetActiveRangesRequest get_active_ranges_req            = 200;
-
-        // kv operations
-        kv.rpc.RangeRequest kv_range_req                        = 400;
-        kv.rpc.PutRequest kv_put_req                            = 401;
-        kv.rpc.DeleteRangeRequest kv_delete_req                 = 402;
-        kv.rpc.TxnRequest kv_txn_req                            = 403;
-        kv.rpc.IncrementRequest kv_incr_req                     = 404;
-    }
-}
-
-message StorageContainerResponse {
-    StatusCode code             = 1;
-
-    oneof response {
-        // stream metadata operations
-        GetActiveRangesResponse get_active_ranges_resp          = 200;
-
-        // kv operations
-        kv.rpc.RangeResponse kv_range_resp                      = 400;
-        kv.rpc.PutResponse kv_put_resp                          = 401;
-        kv.rpc.DeleteRangeResponse kv_delete_resp               = 402;
-        kv.rpc.TxnResponse kv_txn_resp                          = 403;
-        kv.rpc.IncrementResponse kv_incr_resp                   = 404;
-    }
-}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java
index 1eaac62..a584a37 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java
@@ -18,22 +18,30 @@
 package org.apache.bookkeeper.stream.storage.api.kv;
 
 import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 
 /**
  * The table store that stores and serves tables.
  */
 public interface TableStore {
 
-    CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request);
+    CompletableFuture<RangeResponse> range(RangeRequest request);
 
-    CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request);
+    CompletableFuture<PutResponse> put(PutRequest request);
 
-    CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request);
+    CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest request);
 
-    CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request);
+    CompletableFuture<TxnResponse> txn(TxnRequest request);
 
-    CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request);
+    CompletableFuture<IncrementResponse> incr(IncrementRequest request);
 
 }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java
index 4215dd5..7ef664d 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java
@@ -19,8 +19,8 @@
 package org.apache.bookkeeper.stream.storage.api.metadata;
 
 import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 
 /**
  * The metadata store that store ranges.
@@ -33,6 +33,6 @@ public interface MetaRangeStore {
      * @param request the request
      * @return the active ranges
      */
-    CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request);
+    CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request);
 
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
index 2fe81e7..1fb0e44 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
@@ -19,12 +19,12 @@ package org.apache.bookkeeper.stream.storage.impl.grpc;
 
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.impl.grpc.handler.StorageContainerResponseHandler;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.impl.grpc.handler.ResponseHandler;
 
 /**
  * The gRPC protocol based range service.
@@ -44,10 +44,17 @@ public class GrpcMetaRangeService extends MetaRangeServiceImplBase {
     //
 
     @Override
-    public void getActiveRanges(StorageContainerRequest request,
-                                StreamObserver<StorageContainerResponse> responseObserver) {
+    public void getActiveRanges(GetActiveRangesRequest request,
+                                StreamObserver<GetActiveRangesResponse> responseObserver) {
         rangeStore.getActiveRanges(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<GetActiveRangesResponse>(responseObserver) {
+                @Override
+                protected GetActiveRangesResponse createErrorResp(Throwable cause) {
+                    return GetActiveRangesResponse.newBuilder()
+                        .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                        .build();
+                }
+            });
     }
 
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
index e86759d..f26a7bd 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
@@ -19,11 +19,21 @@ package org.apache.bookkeeper.stream.storage.impl.grpc;
 
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceImplBase;
-import org.apache.bookkeeper.stream.storage.impl.grpc.handler.StorageContainerResponseHandler;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.impl.grpc.handler.ResponseHandler;
 
 /**
  * The gRPC protocol based k/v service.
@@ -39,35 +49,87 @@ public class GrpcTableService extends TableServiceImplBase {
     }
 
     @Override
-    public void range(StorageContainerRequest request,
-                      StreamObserver<StorageContainerResponse> responseObserver) {
+    public void range(RangeRequest request,
+                      StreamObserver<RangeResponse> responseObserver) {
         rangeStore.range(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<RangeResponse>(responseObserver) {
+                @Override
+                protected RangeResponse createErrorResp(Throwable cause) {
+                    return RangeResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 
     @Override
-    public void put(StorageContainerRequest request,
-                    StreamObserver<StorageContainerResponse> responseObserver) {
+    public void put(PutRequest request,
+                    StreamObserver<PutResponse> responseObserver) {
         rangeStore.put(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<PutResponse>(responseObserver) {
+                @Override
+                protected PutResponse createErrorResp(Throwable cause) {
+                    return PutResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 
     @Override
-    public void delete(StorageContainerRequest request,
-                       StreamObserver<StorageContainerResponse> responseObserver) {
+    public void delete(DeleteRangeRequest request,
+                       StreamObserver<DeleteRangeResponse> responseObserver) {
         rangeStore.delete(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<DeleteRangeResponse>(responseObserver) {
+                @Override
+                protected DeleteRangeResponse createErrorResp(Throwable cause) {
+                    return DeleteRangeResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 
     @Override
-    public void txn(StorageContainerRequest request, StreamObserver<StorageContainerResponse> responseObserver) {
+    public void txn(TxnRequest request,
+                    StreamObserver<TxnResponse> responseObserver) {
         rangeStore.txn(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<TxnResponse>(responseObserver) {
+                @Override
+                protected TxnResponse createErrorResp(Throwable cause) {
+                    return TxnResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 
     @Override
-    public void increment(StorageContainerRequest request, StreamObserver<StorageContainerResponse> responseObserver) {
+    public void increment(IncrementRequest request,
+                          StreamObserver<IncrementResponse> responseObserver) {
         rangeStore.incr(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<IncrementResponse>(responseObserver) {
+                @Override
+                protected IncrementResponse createErrorResp(Throwable cause) {
+                    return IncrementResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java
deleted file mode 100644
index 42c21a4..0000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-
-/**
- * Response handler to handle storage container response.
- */
-public class StorageContainerResponseHandler extends ResponseHandler<StorageContainerResponse> {
-
-    public static StorageContainerResponseHandler of(StreamObserver<StorageContainerResponse> respObserver) {
-        return new StorageContainerResponseHandler(respObserver);
-    }
-
-    private StorageContainerResponseHandler(StreamObserver<StorageContainerResponse> respObserver) {
-        super(respObserver);
-    }
-
-    @Override
-    protected StorageContainerResponse createErrorResp(Throwable cause) {
-        return StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
-            .build();
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java
index d7ef678..9a9670a 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.stream.storage.impl.kv;
 
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.fromProtoCompare;
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.handleCause;
-import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.mvccCodeToStatusCode;
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.newStoreKey;
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.processDeleteResult;
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.processIncrementResult;
@@ -46,17 +45,18 @@ import org.apache.bookkeeper.api.kv.result.TxnResult;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
 import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
 
 /**
@@ -72,13 +72,11 @@ public class TableStoreImpl implements TableStore {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
-        RangeRequest rangeReq = request.getKvRangeReq();
-
+    public CompletableFuture<RangeResponse> range(RangeRequest rangeReq) {
         if (log.isTraceEnabled()) {
             log.trace("Received range request {}", rangeReq);
         }
-        return range(rangeReq)
+        return doRange(rangeReq)
             .thenApply(result -> {
                 try {
                     RangeResponse rangeResp = processRangeResult(
@@ -89,19 +87,18 @@ public class TableStoreImpl implements TableStore {
                     result.close();
                 }
             })
-            .thenApply(rangeResp -> StorageContainerResponse.newBuilder()
-                .setCode(StatusCode.SUCCESS)
-                .setKvRangeResp(rangeResp)
-                .build())
             .exceptionally(cause -> {
                 log.error("Failed to process range request {}", rangeReq, cause);
-                return StorageContainerResponse.newBuilder()
-                    .setCode(handleCause(cause))
+                return RangeResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(handleCause(cause))
+                        .setRoutingHeader(rangeReq.getHeader())
+                        .build())
                     .build();
             });
     }
 
-    private CompletableFuture<RangeResult<byte[], byte[]>> range(RangeRequest request) {
+    private CompletableFuture<RangeResult<byte[], byte[]>> doRange(RangeRequest request) {
         RangeOp<byte[], byte[]> op = buildRangeOp(request.getHeader(), request);
         return store.range(op)
             .whenComplete((rangeResult, throwable) -> op.close());
@@ -143,10 +140,8 @@ public class TableStoreImpl implements TableStore {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
-        PutRequest putReq = request.getKvPutReq();
-
-        return put(putReq)
+    public CompletableFuture<PutResponse> put(PutRequest putReq) {
+        return doPut(putReq)
             .thenApply(result -> {
                 try {
                     return processPutResult(
@@ -156,19 +151,18 @@ public class TableStoreImpl implements TableStore {
                     result.close();
                 }
             })
-            .thenApply(putResp -> StorageContainerResponse.newBuilder()
-                .setCode(StatusCode.SUCCESS)
-                .setKvPutResp(putResp)
-                .build())
             .exceptionally(cause -> {
                 log.error("Failed to process put request {}", putReq, cause);
-                return StorageContainerResponse.newBuilder()
-                    .setCode(handleCause(cause))
+                return PutResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(handleCause(cause))
+                        .setRoutingHeader(putReq.getHeader())
+                        .build())
                     .build();
             });
     }
 
-    private CompletableFuture<PutResult<byte[], byte[]>> put(PutRequest request) {
+    private CompletableFuture<PutResult<byte[], byte[]>> doPut(PutRequest request) {
         PutOp<byte[], byte[]> op = buildPutOp(request.getHeader(), request);
         return store.put(op)
             .whenComplete((putResult, throwable) -> op.close());
@@ -187,10 +181,8 @@ public class TableStoreImpl implements TableStore {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
-        IncrementRequest incrementReq = request.getKvIncrReq();
-
-        return increment(incrementReq)
+    public CompletableFuture<IncrementResponse> incr(IncrementRequest incrementReq) {
+        return doIncrement(incrementReq)
             .thenApply(result -> {
                 try {
                     return processIncrementResult(
@@ -200,19 +192,18 @@ public class TableStoreImpl implements TableStore {
                     result.close();
                 }
             })
-            .thenApply(incrementResp -> StorageContainerResponse.newBuilder()
-                .setCode(StatusCode.SUCCESS)
-                .setKvIncrResp(incrementResp)
-                .build())
             .exceptionally(cause -> {
                 log.error("Failed to process increment request {}", incrementReq, cause);
-                return StorageContainerResponse.newBuilder()
-                    .setCode(handleCause(cause))
+                return IncrementResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(handleCause(cause))
+                        .setRoutingHeader(incrementReq.getHeader())
+                        .build())
                     .build();
             });
     }
 
-    private CompletableFuture<IncrementResult<byte[], byte[]>> increment(IncrementRequest request) {
+    private CompletableFuture<IncrementResult<byte[], byte[]>> doIncrement(IncrementRequest request) {
         IncrementOp<byte[], byte[]> op = buildIncrementOp(request.getHeader(), request);
         return store.increment(op)
             .whenComplete((incrementResult, throwable) -> op.close());
@@ -231,10 +222,8 @@ public class TableStoreImpl implements TableStore {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
-        DeleteRangeRequest deleteReq = request.getKvDeleteReq();
-
-        return delete(deleteReq)
+    public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest deleteReq) {
+        return doDelete(deleteReq)
             .thenApply(result -> {
                 try {
                     return processDeleteResult(
@@ -244,16 +233,15 @@ public class TableStoreImpl implements TableStore {
                     result.close();
                 }
             })
-            .thenApply(deleteResp -> StorageContainerResponse.newBuilder()
-                .setCode(StatusCode.SUCCESS)
-                .setKvDeleteResp(deleteResp)
-                .build())
-            .exceptionally(cause -> StorageContainerResponse.newBuilder()
-                .setCode(handleCause(cause))
+            .exceptionally(cause -> DeleteRangeResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder()
+                    .setCode(handleCause(cause))
+                    .setRoutingHeader(deleteReq.getHeader())
+                    .build())
                 .build());
     }
 
-    private CompletableFuture<DeleteResult<byte[], byte[]>> delete(DeleteRangeRequest request) {
+    private CompletableFuture<DeleteResult<byte[], byte[]>> doDelete(DeleteRangeRequest request) {
         DeleteOp<byte[], byte[]> op = buildDeleteOp(request.getHeader(), request);
         return store.delete(op)
             .whenComplete((deleteResult, throwable) -> op.close());
@@ -277,30 +265,27 @@ public class TableStoreImpl implements TableStore {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
-        TxnRequest txnReq = request.getKvTxnReq();
-
+    public CompletableFuture<TxnResponse> txn(TxnRequest txnReq) {
         if (log.isTraceEnabled()) {
             log.trace("Received txn request : {}", txnReq);
         }
-        return txn(txnReq)
+        return doTxn(txnReq)
             .thenApply(txnResult -> {
                 try {
-                    TxnResponse txnResponse = processTxnResult(txnReq.getHeader(), txnResult);
-                    return StorageContainerResponse.newBuilder()
-                        .setCode(mvccCodeToStatusCode(txnResult.code()))
-                        .setKvTxnResp(txnResponse)
-                        .build();
+                    return processTxnResult(txnReq.getHeader(), txnResult);
                 } finally {
                     txnResult.close();
                 }
             })
-            .exceptionally(cause -> StorageContainerResponse.newBuilder()
-                .setCode(handleCause(cause))
+            .exceptionally(cause -> TxnResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder()
+                    .setCode(handleCause(cause))
+                    .setRoutingHeader(txnReq.getHeader())
+                    .build())
                 .build());
     }
 
-    private CompletableFuture<TxnResult<byte[], byte[]>> txn(TxnRequest request) {
+    private CompletableFuture<TxnResult<byte[], byte[]>> doTxn(TxnRequest request) {
         TxnOp<byte[], byte[]> op = buildTxnOp(request);
         return store.txn(op)
             .whenComplete((txnResult, throwable) -> op.close());
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java
index a9a8dce..964ca62 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java
@@ -158,6 +158,7 @@ final class TableStoreUtils {
         ByteString rKey = routingHeader.getRKey();
         PutResponse.Builder putRespBuilder = PutResponse.newBuilder()
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(result.code()))
                 .setRoutingHeader(routingHeader)
                 .build());
         if (null != result.prevKv()) {
@@ -170,6 +171,7 @@ final class TableStoreUtils {
                                                     IncrementResult<byte[], byte[]> result) {
         IncrementResponse.Builder putRespBuilder = IncrementResponse.newBuilder()
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(result.code()))
                 .setRoutingHeader(routingHeader)
                 .build())
             .setTotalAmount(result.totalAmount());
@@ -182,6 +184,7 @@ final class TableStoreUtils {
         return RangeResponse.newBuilder()
             .setCount(result.count())
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(result.code()))
                 .setRoutingHeader(routingHeader)
                 .build())
             .addAllKvs(Lists.transform(result.kvs(), kv -> newKeyValue(rKey, kv)))
@@ -194,6 +197,7 @@ final class TableStoreUtils {
         ByteString rKey = routingHeader.getRKey();
         return DeleteRangeResponse.newBuilder()
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(result.code()))
                 .setRoutingHeader(routingHeader)
                 .build())
             .setDeleted(result.numDeleted())
@@ -205,6 +209,7 @@ final class TableStoreUtils {
                                         TxnResult<byte[], byte[]> txnResult) {
         return TxnResponse.newBuilder()
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(txnResult.code()))
                 .setRoutingHeader(routingHeader)
                 .build())
             .setSucceeded(txnResult.isSuccess())
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
index 62502a2..530d4cd 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
@@ -18,9 +18,6 @@
 
 package org.apache.bookkeeper.stream.storage.impl.metadata;
 
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.GET_ACTIVE_RANGES_REQ;
-
 import com.google.common.collect.Maps;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -30,12 +27,11 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
 import org.apache.bookkeeper.stream.proto.RangeMetadata;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
 import org.apache.bookkeeper.stream.proto.storage.RelationType;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
 import org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange;
@@ -70,11 +66,11 @@ public class MetaRangeStoreImpl
     // Stream API
     //
 
-    private CompletableFuture<StorageContainerResponse> createStreamIfMissing(long streamId,
-                                                                              MetaRangeImpl metaRange,
-                                                                              StreamProperties streamProps) {
+    private CompletableFuture<GetActiveRangesResponse> createStreamIfMissing(long streamId,
+                                                                             MetaRangeImpl metaRange,
+                                                                             StreamProperties streamProps) {
         if (null == streamProps) {
-            return FutureUtils.value(StorageContainerResponse.newBuilder()
+            return FutureUtils.value(GetActiveRangesResponse.newBuilder()
                 .setCode(StatusCode.STREAM_NOT_FOUND)
                 .build());
         }
@@ -86,7 +82,7 @@ public class MetaRangeStoreImpl
                 }
                 return getActiveRanges(metaRange);
             } else {
-                return FutureUtils.value(StorageContainerResponse.newBuilder()
+                return FutureUtils.value(GetActiveRangesResponse.newBuilder()
                     .setCode(StatusCode.INTERNAL_SERVER_ERROR)
                     .build());
             }
@@ -94,11 +90,8 @@ public class MetaRangeStoreImpl
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
-        checkState(
-            GET_ACTIVE_RANGES_REQ == request.getRequestCase(),
-            "Wrong request type: %s", request.getRequestCase());
-        final long streamId = request.getGetActiveRangesReq().getStreamId();
+    public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
+        final long streamId = request.getStreamId();
 
         MetaRangeImpl metaRange = streams.get(streamId);
 
@@ -107,8 +100,8 @@ public class MetaRangeStoreImpl
             return metaRangeImpl.load(streamId)
                 .thenCompose(mr -> {
                     if (null == mr) {
-                        StreamProperties streamProps = request.getGetActiveRangesReq().hasStreamProps()
-                            ? request.getGetActiveRangesReq().getStreamProps() : null;
+                        StreamProperties streamProps = request.hasStreamProps()
+                            ? request.getStreamProps() : null;
                         return createStreamIfMissing(streamId, metaRangeImpl, streamProps);
                     } else {
                         synchronized (streams) {
@@ -122,8 +115,7 @@ public class MetaRangeStoreImpl
         }
     }
 
-    private CompletableFuture<StorageContainerResponse> getActiveRanges(MetaRange metaRange) {
-        StorageContainerResponse.Builder scBuilder = StorageContainerResponse.newBuilder();
+    private CompletableFuture<GetActiveRangesResponse> getActiveRanges(MetaRange metaRange) {
         GetActiveRangesResponse.Builder respBuilder = GetActiveRangesResponse.newBuilder();
         return metaRange.getActiveRanges()
             .thenApplyAsync(ranges -> {
@@ -134,12 +126,11 @@ public class MetaRangeStoreImpl
                         .addAllRelatedRanges(range.getParentsList());
                     respBuilder.addRanges(rrBuilder);
                 }
-                return scBuilder
+                return respBuilder
                     .setCode(StatusCode.SUCCESS)
-                    .setGetActiveRangesResp(respBuilder)
                     .build();
             }, executor)
-            .exceptionally(cause -> scBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
+            .exceptionally(cause -> respBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
     }
 
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
index e15b808..4a1be55 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
@@ -18,13 +18,21 @@
 
 package org.apache.bookkeeper.stream.storage.impl.service;
 
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
-
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -33,12 +41,12 @@ import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 
 /**
@@ -56,11 +64,10 @@ final class FailRequestRangeStoreService implements RangeStoreService {
         this.scheduler = scheduler;
     }
 
-    private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
+    private <T> CompletableFuture<T> failWrongGroupRequest() {
         CompletableFuture<T> future = FutureUtils.createFuture();
-        scheduler.executeOrdered(scId, () -> {
-            future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
-        });
+        scheduler.execute(() ->
+            future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND)));
         return future;
     }
 
@@ -80,17 +87,17 @@ final class FailRequestRangeStoreService implements RangeStoreService {
 
     @Override
     public CompletableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     @Override
     public CompletableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     @Override
     public CompletableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     //
@@ -99,17 +106,17 @@ final class FailRequestRangeStoreService implements RangeStoreService {
 
     @Override
     public CompletableFuture<CreateStreamResponse> createStream(CreateStreamRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     @Override
     public CompletableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     @Override
     public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     //
@@ -117,8 +124,8 @@ final class FailRequestRangeStoreService implements RangeStoreService {
     //
 
     @Override
-    public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
+        return failWrongGroupRequest();
     }
 
     //
@@ -127,27 +134,27 @@ final class FailRequestRangeStoreService implements RangeStoreService {
 
 
     @Override
-    public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<RangeResponse> range(RangeRequest request) {
+        return failWrongGroupRequest();
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<PutResponse> put(PutRequest request) {
+        return failWrongGroupRequest();
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest request) {
+        return failWrongGroupRequest();
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<TxnResponse> txn(TxnRequest request) {
+        return failWrongGroupRequest();
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<IncrementResponse> incr(IncrementRequest request) {
+        return failWrongGroupRequest();
     }
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index 0304fc8..7adcefd 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -18,12 +18,6 @@
 
 package org.apache.bookkeeper.stream.storage.impl.service;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_RANGE_ID;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_STREAM_ID;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_RANGE_ID;
@@ -40,11 +34,16 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -53,12 +52,12 @@ import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.protocol.RangeId;
 import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
@@ -232,7 +231,7 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
     //
 
     @Override
-    public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
+    public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
         return mgStore.getActiveRanges(request);
     }
 
@@ -242,12 +241,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
 
 
     @Override
-    public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
-        checkArgument(KV_RANGE_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        RangeRequest rr = request.getKvRangeReq();
-        RoutingHeader header = rr.getHeader();
+    public CompletableFuture<RangeResponse> range(RangeRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
@@ -260,12 +255,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
-        checkArgument(KV_PUT_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        PutRequest rr = request.getKvPutReq();
-        RoutingHeader header = rr.getHeader();
+    public CompletableFuture<PutResponse> put(PutRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
@@ -278,12 +269,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
-        checkArgument(KV_DELETE_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        DeleteRangeRequest rr = request.getKvDeleteReq();
-        RoutingHeader header = rr.getHeader();
+    public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
@@ -296,12 +283,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
-        checkArgument(KV_TXN_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        TxnRequest rr = request.getKvTxnReq();
-        RoutingHeader header = rr.getHeader();
+    public CompletableFuture<TxnResponse> txn(TxnRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
@@ -314,12 +297,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
-        checkArgument(KV_INCR_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        IncrementRequest ir = request.getKvIncrReq();
-        RoutingHeader header = ir.getHeader();
+    public CompletableFuture<IncrementResponse> incr(IncrementRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
index a269466..9293117 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
@@ -16,7 +16,6 @@ package org.apache.bookkeeper.stream.storage.impl;
 
 import static org.apache.bookkeeper.common.util.ListenableFutures.fromListenableFuture;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateNamespaceRequest;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateStreamRequest;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createDeleteNamespaceRequest;
@@ -46,6 +45,7 @@ import io.grpc.inprocess.InProcessServerBuilder;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -64,6 +64,8 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -72,6 +74,8 @@ import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
@@ -81,10 +85,6 @@ import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRange
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceFutureStub;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -140,6 +140,7 @@ public class TestStorageContainerStoreImpl {
     private TableServiceFutureStub tableService;
     private RootRangeServiceFutureStub rootRangeService;
     private MetaRangeServiceFutureStub metaRangeService;
+    private long scId;
 
     //
     // Utils for table api
@@ -156,64 +157,52 @@ public class TestStorageContainerStoreImpl {
         .setRoutingHeader(TEST_ROUTING_HEADER)
         .build();
 
-    private static StorageContainerRequest createPutRequest(long scId) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvPutReq(PutRequest.newBuilder()
+    private static PutRequest createPutRequest() {
+        return PutRequest.newBuilder()
                 .setHeader(TEST_ROUTING_HEADER)
                 .setKey(TEST_KEY)
                 .setValue(TEST_VAL)
-                .build())
-            .build();
+                .build();
     }
 
-    private static StorageContainerResponse createPutResponse(StatusCode code) {
-        return StorageContainerResponse.newBuilder()
-            .setCode(code)
-            .setKvPutResp(PutResponse.newBuilder()
-                .setHeader(TEST_RESP_HEADER)
-                .build())
-            .build();
+    private static PutResponse createPutResponse(StatusCode code) {
+        return PutResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER)
+                    .setCode(code)
+                    .build())
+                .build();
     }
 
-    private static StorageContainerRequest createRangeRequest(long scId) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvRangeReq(RangeRequest.newBuilder()
+    private static RangeRequest createRangeRequest() {
+        return RangeRequest.newBuilder()
                 .setHeader(TEST_ROUTING_HEADER)
                 .setKey(TEST_KEY)
-                .build())
-            .build();
+                .build();
     }
 
-    private static StorageContainerResponse createRangeResponse(StatusCode code) {
-        return StorageContainerResponse.newBuilder()
-            .setCode(code)
-            .setKvRangeResp(RangeResponse.newBuilder()
-                .setHeader(TEST_RESP_HEADER)
+    private static RangeResponse createRangeResponse(StatusCode code) {
+        return RangeResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER)
+                    .setCode(code)
+                    .build())
                 .setCount(0)
-                .build())
-            .build();
+                .build();
     }
 
-    private static StorageContainerRequest createDeleteRequest(long scId) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvDeleteReq(DeleteRangeRequest.newBuilder()
+    private static DeleteRangeRequest createDeleteRequest() {
+        return DeleteRangeRequest.newBuilder()
                 .setHeader(TEST_ROUTING_HEADER)
                 .setKey(TEST_KEY)
-                .build())
-            .build();
+                .build();
     }
 
-    private static StorageContainerResponse createDeleteResponse(StatusCode code) {
-        return StorageContainerResponse.newBuilder()
-            .setCode(code)
-            .setKvDeleteResp(DeleteRangeResponse.newBuilder()
-                .setHeader(TEST_RESP_HEADER)
+    private static DeleteRangeResponse createDeleteResponse(StatusCode code) {
+        return DeleteRangeResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER)
+                    .setCode(code)
+                    .build())
                 .setDeleted(0)
-                .build())
-            .build();
+                .build();
     }
 
     @SuppressWarnings("unchecked")
@@ -263,10 +252,12 @@ public class TestStorageContainerStoreImpl {
             .usePlaintext()
             .build();
 
+        scId = ThreadLocalRandom.current().nextInt(2);
+
         // intercept the channel with storage container information.
         channel = ClientInterceptors.intercept(
             channel,
-            new StorageContainerClientInterceptor(0L));
+            new StorageContainerClientInterceptor(scId));
 
 
         tableService = TableServiceGrpc.newFutureStub(channel);
@@ -307,7 +298,7 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testCreateNamespaceNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-create-namespace-no-root-storage-container-store";
         verifyNotFoundException(fromListenableFuture(
@@ -317,7 +308,7 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testDeleteNamespaceNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-delete-namespace-no-root-storage-container-store";
         verifyNotFoundException(fromListenableFuture(
@@ -327,7 +318,7 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testGetNamespaceNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-get-namespace-no-root-storage-container-store";
         verifyNotFoundException(fromListenableFuture(
@@ -395,7 +386,7 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testCreateStreamNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-create-namespace-no-root-storage-container-store";
         String streamName = colName;
@@ -406,7 +397,7 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testDeleteStreamNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-delete-namespace-no-root-storage-container-store";
         String streamName = colName;
@@ -417,7 +408,7 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testGetStreamNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-get-namespace-no-root-storage-container-store";
         String streamName = colName;
@@ -482,26 +473,24 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testGetActiveRangesNoManager() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         verifyNotFoundException(fromListenableFuture(
-            metaRangeService.getActiveRanges(createGetActiveRangesRequest(12L, 34L))),
+            metaRangeService.getActiveRanges(createGetActiveRangesRequest(34L))),
             Status.NOT_FOUND);
     }
 
     @Test
     public void testGetActiveRangesMockManager() throws Exception {
-        long scId = System.currentTimeMillis();
-
-        StorageContainerResponse resp = StorageContainerResponse.newBuilder()
+        GetActiveRangesResponse resp = GetActiveRangesResponse.newBuilder()
             .setCode(StatusCode.STREAM_NOT_FOUND)
             .build();
-        StorageContainerRequest request = createGetActiveRangesRequest(scId, 34L);
+        GetActiveRangesRequest request = createGetActiveRangesRequest(34L);
 
         when(mockRangeStoreService.getActiveRanges(request))
             .thenReturn(CompletableFuture.completedFuture(resp));
 
-        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+        CompletableFuture<GetActiveRangesResponse> future = fromListenableFuture(
             metaRangeService.getActiveRanges(request));
         verify(mockRangeStoreService, times(1)).getActiveRanges(request);
         assertTrue(resp == future.get());
@@ -514,40 +503,40 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testPutNoStorageContainer() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         verifyNotFoundException(fromListenableFuture(
-            tableService.put(createPutRequest(ROOT_STORAGE_CONTAINER_ID))),
+            tableService.put(createPutRequest())),
             Status.NOT_FOUND);
     }
 
     @Test
     public void testDeleteNoStorageContainer() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         verifyNotFoundException(fromListenableFuture(
-            tableService.delete(createDeleteRequest(ROOT_STORAGE_CONTAINER_ID))),
+            tableService.delete(createDeleteRequest())),
             Status.NOT_FOUND);
     }
 
     @Test
     public void testRangeNoStorageContainer() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         verifyNotFoundException(fromListenableFuture(
-            tableService.range(createRangeRequest(ROOT_STORAGE_CONTAINER_ID))),
+            tableService.range(createRangeRequest())),
             Status.NOT_FOUND);
     }
 
     @Test
     public void testRangeMockStorageContainer() throws Exception {
-        StorageContainerResponse response = createRangeResponse(StatusCode.SUCCESS);
-        StorageContainerRequest request = createRangeRequest(ROOT_STORAGE_CONTAINER_ID);
+        RangeResponse response = createRangeResponse(StatusCode.SUCCESS);
+        RangeRequest request = createRangeRequest();
 
         when(mockRangeStoreService.range(request))
             .thenReturn(CompletableFuture.completedFuture(response));
 
-        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+        CompletableFuture<RangeResponse> future = fromListenableFuture(
             tableService.range(request));
         verify(mockRangeStoreService, times(1)).range(eq(request));
         assertTrue(response == future.get());
@@ -555,13 +544,13 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testDeleteMockStorageContainer() throws Exception {
-        StorageContainerResponse response = createDeleteResponse(StatusCode.SUCCESS);
-        StorageContainerRequest request = createDeleteRequest(ROOT_STORAGE_CONTAINER_ID);
+        DeleteRangeResponse response = createDeleteResponse(StatusCode.SUCCESS);
+        DeleteRangeRequest request = createDeleteRequest();
 
         when(mockRangeStoreService.delete(request))
             .thenReturn(CompletableFuture.completedFuture(response));
 
-        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+        CompletableFuture<DeleteRangeResponse> future = fromListenableFuture(
             tableService.delete(request));
         verify(mockRangeStoreService, times(1)).delete(eq(request));
         assertTrue(response == future.get());
@@ -569,13 +558,13 @@ public class TestStorageContainerStoreImpl {
 
     @Test
     public void testPutMockStorageContainer() throws Exception {
-        StorageContainerResponse response = createPutResponse(StatusCode.SUCCESS);
-        StorageContainerRequest request = createPutRequest(ROOT_STORAGE_CONTAINER_ID);
+        PutResponse response = createPutResponse(StatusCode.SUCCESS);
+        PutRequest request = createPutRequest();
 
         when(mockRangeStoreService.put(request))
             .thenReturn(CompletableFuture.completedFuture(response));
 
-        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+        CompletableFuture<PutResponse> future = fromListenableFuture(
             tableService.put(request));
         verify(mockRangeStoreService, times(1)).put(eq(request));
         assertTrue(response == future.get());
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
index 178ac19..3f87b9e 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
@@ -30,8 +30,6 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.junit.Test;
 
@@ -51,23 +49,19 @@ public class TestGrpcMetaRangeService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        GetActiveRangesRequest request = GetActiveRangesRequest
             .newBuilder()
-            .setGetActiveRangesReq(GetActiveRangesRequest
-                .newBuilder()
-                .setStreamId(23456L)
-                .build())
+            .setStreamId(23456L)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
+        GetActiveRangesResponse response = GetActiveRangesResponse.newBuilder()
             .setCode(StatusCode.SUCCESS)
-            .setGetActiveRangesResp(GetActiveRangesResponse.newBuilder())
             .build();
 
         when(rangeService.getActiveRanges(request)).thenReturn(
             CompletableFuture.completedFuture(response));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<GetActiveRangesResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.getActiveRanges(
             request,
@@ -82,22 +76,19 @@ public class TestGrpcMetaRangeService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        GetActiveRangesRequest request = GetActiveRangesRequest
             .newBuilder()
-            .setGetActiveRangesReq(GetActiveRangesRequest
-                .newBuilder()
-                .setStreamId(23456L)
-                .build())
+            .setStreamId(23456L)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
+        GetActiveRangesResponse response = GetActiveRangesResponse.newBuilder()
             .setCode(StatusCode.INTERNAL_SERVER_ERROR)
             .build();
 
         when(rangeService.getActiveRanges(request)).thenReturn(
             FutureUtils.exception(CAUSE));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<GetActiveRangesResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.getActiveRanges(
             request,
@@ -112,18 +103,15 @@ public class TestGrpcMetaRangeService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        GetActiveRangesRequest request = GetActiveRangesRequest
             .newBuilder()
-            .setGetActiveRangesReq(GetActiveRangesRequest
-                .newBuilder()
-                .setStreamId(23456L)
-                .build())
+            .setStreamId(23456L)
             .build();
 
         when(rangeService.getActiveRanges(request)).thenReturn(
             FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<GetActiveRangesResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.getActiveRanges(
             request,
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
index e943fb4..874e69b 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
@@ -34,10 +34,9 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.junit.Test;
 
@@ -66,24 +65,24 @@ public class TestGrpcTableService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        PutRequest request = PutRequest
             .newBuilder()
-            .setKvPutReq(PutRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setValue(TEST_VAL)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setValue(TEST_VAL)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .setKvPutResp(PutResponse.newBuilder())
+        PutResponse response = PutResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.put(request)).thenReturn(
             CompletableFuture.completedFuture(response));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<PutResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.put(
             request,
@@ -98,23 +97,24 @@ public class TestGrpcTableService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        PutRequest request = PutRequest
             .newBuilder()
-            .setKvPutReq(PutRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setValue(TEST_VAL)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setValue(TEST_VAL)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+        PutResponse response = PutResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.put(request)).thenReturn(
             FutureUtils.exception(CAUSE));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<PutResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.put(
             request,
@@ -129,19 +129,17 @@ public class TestGrpcTableService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        PutRequest request = PutRequest
             .newBuilder()
-            .setKvPutReq(PutRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setValue(TEST_VAL)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setValue(TEST_VAL)
+            .setHeader(ROUTING_HEADER)
             .build();
 
         when(rangeService.put(request)).thenReturn(
             FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<PutResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.put(
             request,
@@ -156,23 +154,23 @@ public class TestGrpcTableService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        RangeRequest request = RangeRequest
             .newBuilder()
-            .setKvRangeReq(RangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .setKvRangeResp(RangeResponse.newBuilder())
+        RangeResponse response = RangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.range(request)).thenReturn(
             CompletableFuture.completedFuture(response));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<RangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.range(
             request,
@@ -187,22 +185,23 @@ public class TestGrpcTableService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        RangeRequest request = RangeRequest
             .newBuilder()
-            .setKvRangeReq(RangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+        RangeResponse response = RangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.range(request)).thenReturn(
             FutureUtils.exception(CAUSE));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<RangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.range(
             request,
@@ -217,18 +216,16 @@ public class TestGrpcTableService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        RangeRequest request = RangeRequest
             .newBuilder()
-            .setKvRangeReq(RangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
         when(rangeService.range(request)).thenReturn(
             FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<RangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.range(
             request,
@@ -243,23 +240,23 @@ public class TestGrpcTableService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        DeleteRangeRequest request = DeleteRangeRequest
             .newBuilder()
-            .setKvDeleteReq(DeleteRangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .setKvDeleteResp(DeleteRangeResponse.newBuilder())
+        DeleteRangeResponse response = DeleteRangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.delete(request)).thenReturn(
             CompletableFuture.completedFuture(response));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<DeleteRangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.delete(
             request,
@@ -274,22 +271,23 @@ public class TestGrpcTableService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        DeleteRangeRequest request = DeleteRangeRequest
             .newBuilder()
-            .setKvDeleteReq(DeleteRangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+        DeleteRangeResponse response = DeleteRangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.delete(request)).thenReturn(
             FutureUtils.exception(CAUSE));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<DeleteRangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.delete(
             request,
@@ -304,18 +302,16 @@ public class TestGrpcTableService {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        DeleteRangeRequest request = DeleteRangeRequest
             .newBuilder()
-            .setKvDeleteReq(DeleteRangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
         when(rangeService.delete(request)).thenReturn(
             FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<DeleteRangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.delete(
             request,
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
deleted file mode 100644
index 3eb440a..0000000
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import io.grpc.Status;
-import io.grpc.StatusException;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.exceptions.StorageException;
-import org.junit.Test;
-import org.mockito.stubbing.Answer;
-
-/**
- * Unit test for {@link StorageContainerResponseHandler}.
- */
-public class TestStorageContainerResponseHandler {
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testSuccessResponse() {
-        StreamObserver<StorageContainerResponse> observer =
-            mock(StreamObserver.class);
-        StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .build();
-        handler.accept(response, null);
-        verify(observer, times(1)).onNext(response);
-        verify(observer, times(1)).onCompleted();
-        verify(observer, times(0)).onError(any());
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testStatusRuntimeException() {
-        StreamObserver<StorageContainerResponse> observer =
-            mock(StreamObserver.class);
-        StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
-        StatusRuntimeException exception = new StatusRuntimeException(Status.NOT_FOUND);
-        handler.accept(null, exception);
-        verify(observer, times(0)).onNext(any());
-        verify(observer, times(0)).onCompleted();
-        verify(observer, times(1)).onError(exception);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testStatusException() {
-        StreamObserver<StorageContainerResponse> observer =
-            mock(StreamObserver.class);
-        StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
-        StatusException exception = new StatusException(Status.NOT_FOUND);
-        handler.accept(null, exception);
-        verify(observer, times(0)).onNext(any());
-        verify(observer, times(0)).onCompleted();
-        verify(observer, times(1)).onError(exception);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testInternalError() throws Exception {
-        StreamObserver<StorageContainerResponse> observer =
-            mock(StreamObserver.class);
-        AtomicReference<StorageContainerResponse> responseHolder =
-            new AtomicReference<>(null);
-        CountDownLatch latch = new CountDownLatch(1);
-        doAnswer((Answer<Void>) invocation -> {
-            StorageContainerResponse resp = invocation.getArgument(0);
-            responseHolder.set(resp);
-            latch.countDown();
-            return null;
-        }).when(observer).onNext(any(StorageContainerResponse.class));
-        StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
-        StorageException exception = new StorageException("test-exception");
-        handler.accept(null, exception);
-        verify(observer, times(1)).onNext(any());
-        verify(observer, times(1)).onCompleted();
-        verify(observer, times(0)).onError(any());
-
-        latch.await();
-        assertNotNull(responseHolder.get());
-        assertEquals(
-            StatusCode.INTERNAL_SERVER_ERROR,
-            responseHolder.get().getCode());
-    }
-
-}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java
index f7b88fc..2c7124f 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java
@@ -46,9 +46,6 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse.ResponseCase;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCAsyncStoreTestBase;
 import org.junit.Test;
 
@@ -90,16 +87,14 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
     }
 
     private List<KeyValue> writeKVs(int numPairs, boolean prevKv) throws Exception {
-        List<CompletableFuture<StorageContainerResponse>> results =
+        List<CompletableFuture<PutResponse>> results =
             Lists.newArrayListWithExpectedSize(numPairs);
         for (int i = 0; i < numPairs; i++) {
             results.add(writeKV(i, prevKv));
         }
         return Lists.transform(
-            result(FutureUtils.collect(results)), response -> {
-                assertEquals(StatusCode.SUCCESS, response.getCode());
-                assertEquals(ResponseCase.KV_PUT_RESP, response.getResponseCase());
-                PutResponse putResp = response.getKvPutResp();
+            result(FutureUtils.collect(results)), putResp -> {
+                assertEquals(StatusCode.SUCCESS, putResp.getHeader().getCode());
                 assertEquals(HEADER, putResp.getHeader().getRoutingHeader());
                 if (putResp.hasPrevKv()) {
                     return putResp.getPrevKv();
@@ -109,33 +104,26 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
             });
     }
 
-    private CompletableFuture<StorageContainerResponse> writeKV(int i, boolean prevKv) {
-        return tableStore.put(StorageContainerRequest.newBuilder()
-            .setScId(SC_ID)
-            .setKvPutReq(PutRequest.newBuilder()
-                .setKey(getKey(i))
-                .setValue(getValue(i))
-                .setHeader(HEADER)
-                .setPrevKv(prevKv))
+    private CompletableFuture<PutResponse> writeKV(int i, boolean prevKv) {
+        return tableStore.put(PutRequest.newBuilder()
+            .setKey(getKey(i))
+            .setValue(getValue(i))
+            .setHeader(HEADER)
+            .setPrevKv(prevKv)
             .build());
     }
 
-    StorageContainerResponse getKeyFromTableStore(int i) throws Exception {
+    RangeResponse getKeyFromTableStore(int i) throws Exception {
         return result(
-            tableStore.range(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvRangeReq(RangeRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(i))
-                    .build())
+            tableStore.range(RangeRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(i))
                 .build()));
     }
 
     KeyValue getKeyValue(int i) throws Exception {
-        StorageContainerResponse resp = getKeyFromTableStore(i);
-        assertEquals(StatusCode.SUCCESS, resp.getCode());
-        assertEquals(ResponseCase.KV_RANGE_RESP, resp.getResponseCase());
-        RangeResponse rr = resp.getKvRangeResp();
+        RangeResponse rr = getKeyFromTableStore(i);
+        assertEquals(StatusCode.SUCCESS, rr.getHeader().getCode());
         assertEquals(HEADER, rr.getHeader().getRoutingHeader());
         assertFalse(rr.getMore());
         if (0 == rr.getCount()) {
@@ -146,53 +134,43 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
     }
 
     void putKeyToTableStore(int key, int value) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.put(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvPutReq(PutRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(key))
-                    .setValue(getValue(value))
-                    .build())
+        PutResponse putResp = result(
+            tableStore.put(PutRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(key))
+                .setValue(getValue(value))
                 .build()));
 
-        assertEquals(StatusCode.SUCCESS, response.getCode());
-        assertEquals(ResponseCase.KV_PUT_RESP, response.getResponseCase());
-        PutResponse putResp = response.getKvPutResp();
+        assertEquals(StatusCode.SUCCESS, putResp.getHeader().getCode());
         assertEquals(HEADER, putResp.getHeader().getRoutingHeader());
         assertFalse(putResp.hasPrevKv());
     }
 
     KeyValue putIfAbsentToTableStore(int key, int value, boolean expectedSuccess) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.txn(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvTxnReq(TxnRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .addCompare(Compare.newBuilder()
-                        .setResult(CompareResult.EQUAL)
-                        .setTarget(CompareTarget.VALUE)
+        TxnResponse txnResp = result(
+            tableStore.txn(TxnRequest.newBuilder()
+                .setHeader(HEADER)
+                .addCompare(Compare.newBuilder()
+                    .setResult(CompareResult.EQUAL)
+                    .setTarget(CompareTarget.VALUE)
+                    .setKey(getKey(key))
+                    .setValue(ByteString.copyFrom(new byte[0])))
+                .addSuccess(RequestOp.newBuilder()
+                    .setRequestPut(PutRequest.newBuilder()
+                        .setHeader(HEADER)
                         .setKey(getKey(key))
-                        .setValue(ByteString.copyFrom(new byte[0])))
-                    .addSuccess(RequestOp.newBuilder()
-                        .setRequestPut(PutRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .setValue(getValue(value))
-                            .setPrevKv(true)
-                            .build()))
-                    .addFailure(RequestOp.newBuilder()
-                        .setRequestRange(RangeRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .build()))
-                    .build())
+                        .setValue(getValue(value))
+                        .setPrevKv(true)
+                        .build()))
+                .addFailure(RequestOp.newBuilder()
+                    .setRequestRange(RangeRequest.newBuilder()
+                        .setHeader(HEADER)
+                        .setKey(getKey(key))
+                        .build()))
                 .build()));
 
-        assertEquals(ResponseCase.KV_TXN_RESP, response.getResponseCase());
-        TxnResponse txnResp = response.getKvTxnResp();
         assertEquals(HEADER, txnResp.getHeader().getRoutingHeader());
-        assertEquals(StatusCode.SUCCESS, response.getCode());
+        assertEquals(StatusCode.SUCCESS, txnResp.getHeader().getCode());
 
         ResponseOp respOp = txnResp.getResponses(0);
         if (expectedSuccess) {
@@ -215,39 +193,34 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
         }
     }
 
-    StorageContainerResponse vPutToTableStore(int key, int value, long version)
+    TxnResponse vPutToTableStore(int key, int value, long version)
         throws Exception {
         return result(
-            tableStore.txn(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvTxnReq(TxnRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .addCompare(Compare.newBuilder()
-                        .setResult(CompareResult.EQUAL)
-                        .setTarget(CompareTarget.VERSION)
+            tableStore.txn(TxnRequest.newBuilder()
+                .setHeader(HEADER)
+                .addCompare(Compare.newBuilder()
+                    .setResult(CompareResult.EQUAL)
+                    .setTarget(CompareTarget.VERSION)
+                    .setKey(getKey(key))
+                    .setVersion(version))
+                .addSuccess(RequestOp.newBuilder()
+                    .setRequestPut(PutRequest.newBuilder()
+                        .setHeader(HEADER)
+                        .setKey(getKey(key))
+                        .setValue(getValue(value))
+                        .setPrevKv(true)
+                        .build()))
+                .addFailure(RequestOp.newBuilder()
+                    .setRequestRange(RangeRequest.newBuilder()
+                        .setHeader(HEADER)
                         .setKey(getKey(key))
-                        .setVersion(version))
-                    .addSuccess(RequestOp.newBuilder()
-                        .setRequestPut(PutRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .setValue(getValue(value))
-                            .setPrevKv(true)
-                            .build()))
-                    .addFailure(RequestOp.newBuilder()
-                        .setRequestRange(RangeRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .build()))
-                    .build())
+                        .build()))
                 .build()));
     }
 
-    KeyValue verifyVPutResponse(StorageContainerResponse response, boolean expectedSuccess) throws Exception {
-        assertEquals(ResponseCase.KV_TXN_RESP, response.getResponseCase());
-        TxnResponse txnResp = response.getKvTxnResp();
+    KeyValue verifyVPutResponse(TxnResponse txnResp, boolean expectedSuccess) throws Exception {
         assertEquals(HEADER, txnResp.getHeader().getRoutingHeader());
-        assertEquals(StatusCode.SUCCESS, response.getCode());
+        assertEquals(StatusCode.SUCCESS, txnResp.getHeader().getCode());
 
         ResponseOp respOp = txnResp.getResponses(0);
         if (expectedSuccess) {
@@ -270,89 +243,71 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
         }
     }
 
-    StorageContainerResponse rPutToTableStore(int key, int value, long revision)
+    TxnResponse rPutToTableStore(int key, int value, long revision)
         throws Exception {
         return result(
-            tableStore.txn(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvTxnReq(TxnRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .addCompare(Compare.newBuilder()
-                        .setResult(CompareResult.EQUAL)
-                        .setTarget(CompareTarget.MOD)
+            tableStore.txn(TxnRequest.newBuilder()
+                .setHeader(HEADER)
+                .addCompare(Compare.newBuilder()
+                    .setResult(CompareResult.EQUAL)
+                    .setTarget(CompareTarget.MOD)
+                    .setKey(getKey(key))
+                    .setModRevision(revision))
+                .addSuccess(RequestOp.newBuilder()
+                    .setRequestPut(PutRequest.newBuilder()
+                        .setHeader(HEADER)
+                        .setKey(getKey(key))
+                        .setValue(getValue(value))
+                        .setPrevKv(true)
+                        .build()))
+                .addFailure(RequestOp.newBuilder()
+                    .setRequestRange(RangeRequest.newBuilder()
+                        .setHeader(HEADER)
                         .setKey(getKey(key))
-                        .setModRevision(revision))
-                    .addSuccess(RequestOp.newBuilder()
-                        .setRequestPut(PutRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .setValue(getValue(value))
-                            .setPrevKv(true)
-                            .build()))
-                    .addFailure(RequestOp.newBuilder()
-                        .setRequestRange(RangeRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .build()))
-                    .build())
+                        .build()))
                 .build()));
     }
 
     KeyValue deleteKeyFromTableStore(int key) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.delete(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvDeleteReq(DeleteRangeRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(key))
-                    .setPrevKv(true)
-                    .build())
+        DeleteRangeResponse response = result(
+            tableStore.delete(DeleteRangeRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(key))
+                .setPrevKv(true)
                 .build()));
 
-        assertEquals(StatusCode.SUCCESS, response.getCode());
-        assertEquals(ResponseCase.KV_DELETE_RESP, response.getResponseCase());
-        DeleteRangeResponse delResp = response.getKvDeleteResp();
-        assertEquals(HEADER, delResp.getHeader().getRoutingHeader());
-        if (0 == delResp.getPrevKvsCount()) {
+        assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
+        assertEquals(HEADER, response.getHeader().getRoutingHeader());
+        if (0 == response.getPrevKvsCount()) {
             return null;
         } else {
-            return delResp.getPrevKvs(0);
+            return response.getPrevKvs(0);
         }
     }
 
     List<KeyValue> deleteRange(int startKey, int endKey) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.delete(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvDeleteReq(DeleteRangeRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(startKey))
-                    .setRangeEnd(getKey(endKey))
-                    .setPrevKv(true)
-                    .build())
+        DeleteRangeResponse delResp = result(
+            tableStore.delete(DeleteRangeRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(startKey))
+                .setRangeEnd(getKey(endKey))
+                .setPrevKv(true)
                 .build()));
 
-        assertEquals(StatusCode.SUCCESS, response.getCode());
-        assertEquals(ResponseCase.KV_DELETE_RESP, response.getResponseCase());
-        DeleteRangeResponse delResp = response.getKvDeleteResp();
+        assertEquals(StatusCode.SUCCESS, delResp.getHeader().getCode());
         assertEquals(HEADER, delResp.getHeader().getRoutingHeader());
         return delResp.getPrevKvsList();
     }
 
     List<KeyValue> range(int startKey, int endKey) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.range(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvRangeReq(RangeRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(startKey))
-                    .setRangeEnd(getKey(endKey))
-                    .build())
+        RangeResponse rangeResp = result(
+            tableStore.range(RangeRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(startKey))
+                .setRangeEnd(getKey(endKey))
                 .build()));
 
-        assertEquals(StatusCode.SUCCESS, response.getCode());
-        assertEquals(ResponseCase.KV_RANGE_RESP, response.getResponseCase());
-        RangeResponse rangeResp = response.getKvRangeResp();
+        assertEquals(StatusCode.SUCCESS, rangeResp.getHeader().getCode());
         assertEquals(HEADER, rangeResp.getHeader().getRoutingHeader());
         return rangeResp.getKvsList();
     }
@@ -398,18 +353,18 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
             int key = 2;
             int initialVal = 2;
             int casVal = 99;
-            StorageContainerResponse response = vPutToTableStore(key, initialVal, 100L);
-            assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+            TxnResponse response = vPutToTableStore(key, initialVal, 100L);
+            assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
 
             // vPut(k, v, -1L)
             response = vPutToTableStore(key, initialVal, -1L);
-            assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+            assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
             // put(key2, v)
             KeyValue prevKV = putIfAbsentToTableStore(key, initialVal, true);
             assertNull(prevKV);
             // vPut(key2, v, 0)
             response = vPutToTableStore(key, casVal, 0);
-            assertEquals(StatusCode.SUCCESS, response.getCode());
+            assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
             prevKV = verifyVPutResponse(response, true);
             assertNotNull(prevKV);
             assertEquals(getKey(key), prevKV.getKey());
@@ -428,12 +383,12 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
             int initialVal = 3;
             int casVal = 99;
 
-            StorageContainerResponse response = rPutToTableStore(key, initialVal, 100L);
-            assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+            TxnResponse response = rPutToTableStore(key, initialVal, 100L);
+            assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
 
             // rPut(k, v, -1L)
             response = rPutToTableStore(key, initialVal, -1L);
-            assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+            assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
 
             // put(key2, v)
             KeyValue prevKV = putIfAbsentToTableStore(key, initialVal, true);
@@ -445,7 +400,7 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
 
             // rPut(key2, v, 0)
             response = rPutToTableStore(key, casVal, revision);
-            assertEquals(StatusCode.SUCCESS, response.getCode());
+            assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
 
             kv = getKeyValue(key);
             assertEquals(revision + 1, kv.getModRevision());
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
index d2ed9b6..d0156c5 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
@@ -36,8 +36,6 @@ import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.impl.metadata.stream.MetaRangeImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCAsyncStoreTestBase;
@@ -69,21 +67,18 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
     protected void doTeardown() throws Exception {
     }
 
-    StorageContainerRequest createRequest(StreamProperties streamProperties) {
+    GetActiveRangesRequest createRequest(StreamProperties streamProperties) {
         GetActiveRangesRequest.Builder reqBuilder = GetActiveRangesRequest.newBuilder()
             .setStreamId(this.streamProps.getStreamId());
         if (null != streamProperties) {
             reqBuilder = reqBuilder.setStreamProps(streamProperties);
         }
-        return StorageContainerRequest.newBuilder()
-            .setGetActiveRangesReq(reqBuilder)
-            .setScId(1L)
-            .build();
+        return reqBuilder.build();
     }
 
     @Test
     public void testCreateIfMissingPropsNotSpecified() throws Exception {
-        StorageContainerResponse resp = FutureUtils.result(
+        GetActiveRangesResponse resp = FutureUtils.result(
             this.mrStoreImpl.getActiveRanges(createRequest(null)));
 
         assertEquals(StatusCode.STREAM_NOT_FOUND, resp.getCode());
@@ -91,13 +86,11 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
 
     @Test
     public void testCreateIfMissing() throws Exception {
-        StorageContainerResponse resp = FutureUtils.result(
+        GetActiveRangesResponse resp = FutureUtils.result(
             this.mrStoreImpl.getActiveRanges(createRequest(streamProps)));
 
         assertEquals(StatusCode.SUCCESS, resp.getCode());
-        GetActiveRangesResponse getResp = resp.getGetActiveRangesResp();
-
-        verifyGetResponse(getResp);
+        verifyGetResponse(resp);
     }
 
     private void verifyGetResponse(GetActiveRangesResponse getResp) throws Exception {
@@ -222,22 +215,19 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
 
     @Test
     public void testGetTwice() throws Exception {
-
-        StorageContainerResponse resp = FutureUtils.result(
+        GetActiveRangesResponse resp = FutureUtils.result(
             this.mrStoreImpl.getActiveRanges(createRequest(streamProps)));
 
         assertEquals(StatusCode.SUCCESS, resp.getCode());
 
-        GetActiveRangesResponse getResp = resp.getGetActiveRangesResp();
-        verifyGetResponse(getResp);
+        verifyGetResponse(resp);
 
-        StorageContainerResponse secondResp = FutureUtils.result(
+        GetActiveRangesResponse secondResp = FutureUtils.result(
             this.mrStoreImpl.getActiveRanges(createRequest(streamProps)));
 
         assertEquals(StatusCode.SUCCESS, secondResp.getCode());
 
-        GetActiveRangesResponse secondGetResp = resp.getGetActiveRangesResp();
-        verifyGetResponse(secondGetResp);
+        verifyGetResponse(secondResp);
     }
 
 }
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
index a4d8710..2cbb27f 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
@@ -17,10 +17,6 @@
  */
 package org.apache.bookkeeper.stream.storage.impl.service;
 
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_RANGE_ID;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_STREAM_ID;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_RANGE_ID;
@@ -40,11 +36,15 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -53,13 +53,12 @@ import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.protocol.RangeId;
 import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
 import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
@@ -322,11 +321,11 @@ public class RangeStoreServiceImplTest {
     public void testGetActiveRanges() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(mrStore.getActiveRanges(any(StorageContainerRequest.class)))
+        GetActiveRangesResponse expectedResp = GetActiveRangesResponse.getDefaultInstance();
+        when(mrStore.getActiveRanges(any(GetActiveRangesRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest expectedReq = StorageContainerRequest.getDefaultInstance();
+        GetActiveRangesRequest expectedReq = GetActiveRangesRequest.getDefaultInstance();
         assertSame(
             expectedResp,
             FutureUtils.result(mrStore.getActiveRanges(expectedReq)));
@@ -338,50 +337,66 @@ public class RangeStoreServiceImplTest {
     // Table API
     //
 
-    private StorageContainerRequest newStorageContainerRequest(RequestCase type) {
-        StorageContainerRequest.Builder reqBuilder = StorageContainerRequest.newBuilder()
-            .setScId(SCID);
+    private PutRequest newPutRequest() {
         RoutingHeader header = RoutingHeader.newBuilder()
             .setStreamId(STREAM_ID)
             .setRangeId(RANGE_ID)
             .build();
-        switch (type) {
-            case KV_PUT_REQ:
-                reqBuilder = reqBuilder.setKvPutReq(
-                    PutRequest.newBuilder().setHeader(header));
-                break;
-            case KV_DELETE_REQ:
-                reqBuilder = reqBuilder.setKvDeleteReq(
-                    DeleteRangeRequest.newBuilder().setHeader(header));
-                break;
-            case KV_RANGE_REQ:
-                reqBuilder = reqBuilder.setKvRangeReq(
-                    RangeRequest.newBuilder().setHeader(header));
-                break;
-            case KV_TXN_REQ:
-                reqBuilder = reqBuilder.setKvTxnReq(
-                    TxnRequest.newBuilder().setHeader(header));
-                break;
-            case KV_INCR_REQ:
-                reqBuilder = reqBuilder.setKvIncrReq(
-                    IncrementRequest.newBuilder().setHeader(header));
-                break;
-            default:
-                break;
-        }
-        return reqBuilder.build();
+        return PutRequest.newBuilder()
+            .setHeader(header)
+            .build();
+    }
+
+    private DeleteRangeRequest newDeleteRequest() {
+        RoutingHeader header = RoutingHeader.newBuilder()
+            .setStreamId(STREAM_ID)
+            .setRangeId(RANGE_ID)
+            .build();
+        return DeleteRangeRequest.newBuilder()
+            .setHeader(header)
+            .build();
+    }
+
+    private RangeRequest newRangeRequest() {
+        RoutingHeader header = RoutingHeader.newBuilder()
+            .setStreamId(STREAM_ID)
+            .setRangeId(RANGE_ID)
+            .build();
+        return RangeRequest.newBuilder()
+            .setHeader(header)
+            .build();
+    }
+
+    private IncrementRequest newIncrRequest() {
+        RoutingHeader header = RoutingHeader.newBuilder()
+            .setStreamId(STREAM_ID)
+            .setRangeId(RANGE_ID)
+            .build();
+        return IncrementRequest.newBuilder()
+            .setHeader(header)
+            .build();
+    }
+
+    private TxnRequest newTxnRequest() {
+        RoutingHeader header = RoutingHeader.newBuilder()
+            .setStreamId(STREAM_ID)
+            .setRangeId(RANGE_ID)
+            .build();
+        return TxnRequest.newBuilder()
+            .setHeader(header)
+            .build();
     }
 
     @Test
     public void testRangeWhenTableStoreNotCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.range(any(StorageContainerRequest.class)))
+        RangeResponse expectedResp = RangeResponse.getDefaultInstance();
+        when(trStore.range(any(RangeRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_RANGE_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.range(request));
+        RangeRequest request = newRangeRequest();
+        RangeResponse response = FutureUtils.result(container.range(request));
         assertSame(expectedResp, response);
         assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
     }
@@ -390,13 +405,13 @@ public class RangeStoreServiceImplTest {
     public void testRangeWhenTableStoreCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.range(any(StorageContainerRequest.class)))
+        RangeResponse expectedResp = RangeResponse.getDefaultInstance();
+        when(trStore.range(any(RangeRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
         container.getTableStoreCache().getTableStores().put(RID, trStore);
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_RANGE_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.range(request));
+        RangeRequest request = newRangeRequest();
+        RangeResponse response = FutureUtils.result(container.range(request));
         assertSame(expectedResp, response);
     }
 
@@ -404,12 +419,12 @@ public class RangeStoreServiceImplTest {
     public void testPutWhenTableStoreNotCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.put(any(StorageContainerRequest.class)))
+        PutResponse expectedResp = PutResponse.getDefaultInstance();
+        when(trStore.put(any(PutRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_PUT_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.put(request));
+        PutRequest request = newPutRequest();
+        PutResponse response = FutureUtils.result(container.put(request));
         assertSame(expectedResp, response);
         assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
     }
@@ -418,13 +433,13 @@ public class RangeStoreServiceImplTest {
     public void testPutWhenTableStoreCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.put(any(StorageContainerRequest.class)))
+        PutResponse expectedResp = PutResponse.getDefaultInstance();
+        when(trStore.put(any(PutRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
         container.getTableStoreCache().getTableStores().put(RID, trStore);
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_PUT_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.put(request));
+        PutRequest request = newPutRequest();
+        PutResponse response = FutureUtils.result(container.put(request));
         assertSame(expectedResp, response);
     }
 
@@ -432,12 +447,12 @@ public class RangeStoreServiceImplTest {
     public void testDeleteWhenTableStoreNotCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.delete(any(StorageContainerRequest.class)))
+        DeleteRangeResponse expectedResp = DeleteRangeResponse.getDefaultInstance();
+        when(trStore.delete(any(DeleteRangeRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_DELETE_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.delete(request));
+        DeleteRangeRequest request = newDeleteRequest();
+        DeleteRangeResponse response = FutureUtils.result(container.delete(request));
         assertSame(expectedResp, response);
         assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
     }
@@ -446,13 +461,13 @@ public class RangeStoreServiceImplTest {
     public void testDeleteWhenTableStoreCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.delete(any(StorageContainerRequest.class)))
+        DeleteRangeResponse expectedResp = DeleteRangeResponse.getDefaultInstance();
+        when(trStore.delete(any(DeleteRangeRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
         container.getTableStoreCache().getTableStores().put(RID, trStore);
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_DELETE_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.delete(request));
+        DeleteRangeRequest request = newDeleteRequest();
+        DeleteRangeResponse response = FutureUtils.result(container.delete(request));
         assertSame(expectedResp, response);
     }
 
@@ -460,12 +475,12 @@ public class RangeStoreServiceImplTest {
     public void testTxnWhenTableStoreNotCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.txn(any(StorageContainerRequest.class)))
+        TxnResponse expectedResp = TxnResponse.getDefaultInstance();
+        when(trStore.txn(any(TxnRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_TXN_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.txn(request));
+        TxnRequest request = newTxnRequest();
+        TxnResponse response = FutureUtils.result(container.txn(request));
         assertSame(expectedResp, response);
         assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
     }
@@ -474,13 +489,13 @@ public class RangeStoreServiceImplTest {
     public void testTxnWhenTableStoreCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.txn(any(StorageContainerRequest.class)))
+        TxnResponse expectedResp = TxnResponse.getDefaultInstance();
+        when(trStore.txn(any(TxnRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
         container.getTableStoreCache().getTableStores().put(RID, trStore);
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_TXN_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.txn(request));
+        TxnRequest request = newTxnRequest();
+        TxnResponse response = FutureUtils.result(container.txn(request));
         assertSame(expectedResp, response);
     }
 }

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.