You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/30 17:18:26 UTC

[GitHub] sijie closed pull request #1452: [table service] remove StorageContainerRequest and StorageContainerResponse

sijie closed pull request #1452: [table service] remove StorageContainerRequest and StorageContainerResponse
URL: https://github.com/apache/bookkeeper/pull/1452
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index f5e2f944b..9cd0273d1 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -18,11 +18,26 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <groupId>org.apache.bookkeeper</groupId>
-    <artifactId>bookkeeper</artifactId>
-    <version>4.8.0-SNAPSHOT</version>
+    <groupId>org.apache</groupId>
+    <artifactId>apache</artifactId>
+    <version>18</version>
   </parent>
   <artifactId>buildtools</artifactId>
   <name>Apache BookKeeper :: Build Tools</name>
   <version>4.8.0-SNAPSHOT</version>
+  <properties>
+    <spotbugs-maven-plugin.version>3.1.0-RC6</spotbugs-maven-plugin.version>
+  </properties>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
+        <version>${spotbugs-maven-plugin.version}</version>
+        <configuration>
+          <excludeFilterFile>${session.executionRootDirectory}/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml</excludeFilterFile>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/pom.xml b/pom.xml
index 5e5219e0a..7d457e4f5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -622,6 +622,12 @@
   <!-- dependencies for all modules -->
   <dependencies>
     <!-- provided dependencies (available at compilation and test classpths and *NOT* packaged) -->
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>buildtools</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index d9e9388d5..8660aab9b 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -31,14 +31,14 @@
 import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceFutureStub;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceFutureStub;
 
 /**
  * A channel connected to a range server.
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
index 10481f113..7e61b58a2 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
@@ -78,10 +78,8 @@ StorageContainerChannel getStorageContainerClient() {
     @Override
     public CompletableFuture<HashStreamRanges> getActiveDataRanges() {
         return MetaRangeRequestProcessor.of(
-            createGetActiveRangesRequest(
-                scClient.getStorageContainerId(),
-                streamProps),
-            (response) -> createActiveRanges(response.getGetActiveRangesResp()),
+            createGetActiveRangesRequest(streamProps),
+            (response) -> createActiveRanges(response),
             scClient,
             executor,
             backoffPolicy
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
index cb25e84dd..39e9a5a33 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
@@ -21,37 +21,36 @@
 import static org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils.createMetaRangeException;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
 import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
 import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 
 /**
  * Request Processor processing meta range request.
  */
 public class MetaRangeRequestProcessor<RespT>
-    extends ListenableFutureRpcProcessor<StorageContainerRequest, StorageContainerResponse, RespT> {
+    extends ListenableFutureRpcProcessor<GetActiveRangesRequest, GetActiveRangesResponse, RespT> {
 
     public static <T> MetaRangeRequestProcessor<T> of(
-        StorageContainerRequest request,
-        Function<StorageContainerResponse, T> responseFunc,
+        GetActiveRangesRequest request,
+        Function<GetActiveRangesResponse, T> responseFunc,
         StorageContainerChannel channel,
         ScheduledExecutorService executor,
         Backoff.Policy backoffPolicy) {
         return new MetaRangeRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
     }
 
-    private final StorageContainerRequest request;
-    private final Function<StorageContainerResponse, RespT> responseFunc;
+    private final GetActiveRangesRequest request;
+    private final Function<GetActiveRangesResponse, RespT> responseFunc;
 
-    private MetaRangeRequestProcessor(StorageContainerRequest request,
-                                      Function<StorageContainerResponse, RespT> responseFunc,
+    private MetaRangeRequestProcessor(GetActiveRangesRequest request,
+                                      Function<GetActiveRangesResponse, RespT> responseFunc,
                                       StorageContainerChannel channel,
                                       ScheduledExecutorService executor,
                                       Backoff.Policy backoffPolicy) {
@@ -61,34 +60,23 @@ private MetaRangeRequestProcessor(StorageContainerRequest request,
     }
 
     @Override
-    protected StorageContainerRequest createRequest() {
+    protected GetActiveRangesRequest createRequest() {
         return request;
     }
 
     @Override
-    protected ListenableFuture<StorageContainerResponse> sendRPC(StorageServerChannel rsChannel,
-                                                                 StorageContainerRequest request) {
-        switch (request.getRequestCase()) {
-            case GET_ACTIVE_RANGES_REQ:
-                return rsChannel.getMetaRangeService().getActiveRanges(request);
-            default:
-                SettableFuture<StorageContainerResponse> respFuture = SettableFuture.create();
-                respFuture.setException(new Exception("Unknown request " + request));
-                return respFuture;
-        }
+    protected ListenableFuture<GetActiveRangesResponse> sendRPC(StorageServerChannel rsChannel,
+                                                                GetActiveRangesRequest request) {
+        return rsChannel.getMetaRangeService().getActiveRanges(request);
     }
 
-    private String getIdentifier(StorageContainerRequest request) {
-        switch (request.getRequestCase()) {
-            case GET_ACTIVE_RANGES_REQ:
-                return "" + request.getGetActiveRangesReq().getStreamId();
-            default:
-                return "";
-        }
+    private String getIdentifier(GetActiveRangesRequest request) {
+
+        return "" + request.getStreamId();
     }
 
     @Override
-    protected RespT processResponse(StorageContainerResponse response) throws Exception {
+    protected RespT processResponse(GetActiveRangesResponse response) throws Exception {
         if (StatusCode.SUCCESS == response.getCode()) {
             return responseFunc.apply(response);
         }
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
index 189b8490f..971616245 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
@@ -45,13 +45,12 @@
 import org.apache.bookkeeper.stream.proto.RangeProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
 import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
 import org.apache.bookkeeper.stream.proto.storage.RelationType;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.junit.Test;
 
 /**
@@ -129,21 +128,18 @@ public void testGetActiveStreamRanges() throws Exception {
 
         // create response
         GetActiveRangesResponse getActiveRangesResponse = GetActiveRangesResponse.newBuilder()
+            .setCode(StatusCode.SUCCESS)
             .addRanges(
                 buildRelatedRange(Long.MIN_VALUE, 0L, 123L, 1L, Lists.newArrayList(113L))
             ).addRanges(
                 buildRelatedRange(0L, Long.MAX_VALUE, 124L, 2L, Lists.newArrayList(114L))
             ).build();
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .setGetActiveRangesResp(getActiveRangesResponse)
-            .build();
 
         MetaRangeServiceImplBase metaRangeService = new MetaRangeServiceImplBase() {
             @Override
-            public void getActiveRanges(StorageContainerRequest request,
-                                        StreamObserver<StorageContainerResponse> responseObserver) {
-                responseObserver.onNext(response);
+            public void getActiveRanges(GetActiveRangesRequest request,
+                                        StreamObserver<GetActiveRangesResponse> responseObserver) {
+                responseObserver.onNext(getActiveRangesResponse);
                 responseObserver.onCompleted();
             }
         };
@@ -154,7 +150,7 @@ public void getActiveRanges(StorageContainerRequest request,
             Optional.empty());
         serviceFuture.complete(rsChannel);
 
-        HashStreamRanges expectedStream = createActiveRanges(response.getGetActiveRangesResp());
+        HashStreamRanges expectedStream = createActiveRanges(getActiveRangesResponse);
         CompletableFuture<HashStreamRanges> getFuture = metaRangeClient.getActiveDataRanges();
         assertEquals(expectedStream, getFuture.get());
     }
@@ -166,8 +162,8 @@ public void testGetActiveStreamRangesFailure() throws Exception {
 
         MetaRangeServiceImplBase metaRangeService = new MetaRangeServiceImplBase() {
             @Override
-            public void getActiveRanges(StorageContainerRequest request,
-                                        StreamObserver<StorageContainerResponse> responseObserver) {
+            public void getActiveRanges(GetActiveRangesRequest request,
+                                        StreamObserver<GetActiveRangesResponse> responseObserver) {
                 responseObserver.onError(new StatusRuntimeException(Status.INTERNAL));
             }
         };
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessor.java
new file mode 100644
index 000000000..88287df1c
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class DeleteRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<DeleteRangeRequest, DeleteRangeResponse, RespT> {
+
+    public static <T> DeleteRequestProcessor<T> of(
+        DeleteRangeRequest request,
+        Function<DeleteRangeResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new DeleteRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final DeleteRangeRequest request;
+    private final Function<DeleteRangeResponse, RespT> responseFunc;
+
+    private DeleteRequestProcessor(DeleteRangeRequest request,
+                                   Function<DeleteRangeResponse, RespT> respFunc,
+                                   StorageContainerChannel channel,
+                                   ScheduledExecutorService executor,
+                                   Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected DeleteRangeRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<DeleteRangeResponse> sendRPC(StorageServerChannel rsChannel,
+                                                            DeleteRangeRequest request) {
+        return rsChannel.getTableService().delete(request);
+    }
+
+    @Override
+    protected RespT processResponse(DeleteRangeResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessor.java
new file mode 100644
index 000000000..f41c97b29
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class IncrementRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<IncrementRequest, IncrementResponse, RespT> {
+
+    public static <T> IncrementRequestProcessor<T> of(
+        IncrementRequest request,
+        Function<IncrementResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new IncrementRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final IncrementRequest request;
+    private final Function<IncrementResponse, RespT> responseFunc;
+
+    private IncrementRequestProcessor(IncrementRequest request,
+                                      Function<IncrementResponse, RespT> respFunc,
+                                      StorageContainerChannel channel,
+                                      ScheduledExecutorService executor,
+                                      Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected IncrementRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<IncrementResponse> sendRPC(StorageServerChannel rsChannel,
+                                                          IncrementRequest request) {
+        return rsChannel.getTableService().increment(request);
+    }
+
+    @Override
+    protected RespT processResponse(IncrementResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java
index 2fd27ce64..279219bc4 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java
@@ -52,9 +52,7 @@
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 
 /**
  * K/V related utils.
@@ -86,15 +84,6 @@ public static ByteString toProtoKey(ByteBuf key) {
         return Lists.transform(kvs, kv -> fromProtoKeyValue(kv, kvFactory));
     }
 
-    public static StorageContainerRequest newKvRangeRequest(
-        long scId,
-        RangeRequest.Builder rangeReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvRangeReq(rangeReq)
-            .build();
-    }
-
     public static RangeRequest.Builder newRangeRequest(ByteBuf key, RangeOption<ByteBuf> option) {
         RangeRequest.Builder builder = RangeRequest.newBuilder()
             .setKey(toProtoKey(key))
@@ -121,15 +110,6 @@ public static StorageContainerRequest newKvRangeRequest(
             .kvs(fromProtoKeyValues(response.getKvsList(), kvFactory));
     }
 
-    public static StorageContainerRequest newKvPutRequest(
-        long scId,
-        PutRequest.Builder putReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvPutReq(putReq)
-            .build();
-    }
-
     public static PutRequest.Builder newPutRequest(ByteBuf key,
                                                    ByteBuf value,
                                                    PutOption<ByteBuf> option) {
@@ -150,15 +130,6 @@ public static StorageContainerRequest newKvPutRequest(
         return result;
     }
 
-    public static StorageContainerRequest newKvIncrementRequest(
-        long scId,
-        IncrementRequest.Builder putReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvIncrReq(putReq)
-            .build();
-    }
-
     public static IncrementRequest.Builder newIncrementRequest(ByteBuf key,
                                                                long amount,
                                                                IncrementOption<ByteBuf> option) {
@@ -177,15 +148,6 @@ public static StorageContainerRequest newKvIncrementRequest(
         return result;
     }
 
-    public static StorageContainerRequest newKvDeleteRequest(
-        long scId,
-        DeleteRangeRequest.Builder deleteReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvDeleteReq(deleteReq)
-            .build();
-    }
-
     public static DeleteRangeRequest.Builder newDeleteRequest(ByteBuf key, DeleteOption<ByteBuf> option) {
         DeleteRangeRequest.Builder builder = DeleteRangeRequest.newBuilder()
             .setKey(UnsafeByteOperations.unsafeWrap(key.nioBuffer()))
@@ -310,15 +272,6 @@ public static CompareResult toProtoResult(org.apache.bookkeeper.api.kv.op.Compar
         return reqBuilder;
     }
 
-    public static StorageContainerRequest newKvTxnRequest(
-        long scId,
-        TxnRequest.Builder txnReq) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvTxnReq(txnReq)
-            .build();
-    }
-
     public static TxnResult<ByteBuf, ByteBuf> newKvTxnResult(
         TxnResponse txnResponse,
         ResultFactory<ByteBuf, ByteBuf> resultFactory,
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
index 03c64f779..1f07b80bf 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
@@ -94,12 +94,11 @@
         if (null != option.endKey()) {
             option.endKey().retain();
         }
-        return TableRequestProcessor.of(
-            KvUtils.newKvRangeRequest(
-                scChannel.getStorageContainerId(),
-                KvUtils.newRangeRequest(lKey, option)
-                    .setHeader(newRoutingHeader(pKey))),
-            response -> KvUtils.newRangeResult(response.getKvRangeResp(), resultFactory, kvFactory),
+        return RangeRequestProcessor.of(
+            KvUtils.newRangeRequest(lKey, option)
+                .setHeader(newRoutingHeader(pKey))
+                .build(),
+            response -> KvUtils.newRangeResult(response, resultFactory, kvFactory),
             scChannel,
             executor,
             backoffPolicy
@@ -120,12 +119,11 @@
         pKey.retain();
         lKey.retain();
         value.retain();
-        return TableRequestProcessor.of(
-            KvUtils.newKvPutRequest(
-                scChannel.getStorageContainerId(),
-                KvUtils.newPutRequest(lKey, value, option)
-                    .setHeader(newRoutingHeader(pKey))),
-            response -> KvUtils.newPutResult(response.getKvPutResp(), resultFactory, kvFactory),
+        return PutRequestProcessor.of(
+            KvUtils.newPutRequest(lKey, value, option)
+                .setHeader(newRoutingHeader(pKey))
+                .build(),
+            response -> KvUtils.newPutResult(response, resultFactory, kvFactory),
             scChannel,
             executor,
             backoffPolicy
@@ -145,12 +143,11 @@
         if (null != option.endKey()) {
             option.endKey().retain();
         }
-        return TableRequestProcessor.of(
-            KvUtils.newKvDeleteRequest(
-                scChannel.getStorageContainerId(),
-                KvUtils.newDeleteRequest(lKey, option)
-                    .setHeader(newRoutingHeader(pKey))),
-            response -> KvUtils.newDeleteResult(response.getKvDeleteResp(), resultFactory, kvFactory),
+        return DeleteRequestProcessor.of(
+            KvUtils.newDeleteRequest(lKey, option)
+                .setHeader(newRoutingHeader(pKey))
+                .build(),
+            response -> KvUtils.newDeleteResult(response, resultFactory, kvFactory),
             scChannel,
             executor,
             backoffPolicy
@@ -170,12 +167,11 @@
                                                                           IncrementOption<ByteBuf> option) {
         pKey.retain();
         lKey.retain();
-        return TableRequestProcessor.of(
-            KvUtils.newKvIncrementRequest(
-                scChannel.getStorageContainerId(),
-                KvUtils.newIncrementRequest(lKey, amount, option)
-                    .setHeader(newRoutingHeader(pKey))),
-            response -> KvUtils.newIncrementResult(response.getKvIncrResp(), resultFactory, kvFactory),
+        return IncrementRequestProcessor.of(
+            KvUtils.newIncrementRequest(lKey, amount, option)
+                .setHeader(newRoutingHeader(pKey))
+                .build(),
+            response -> KvUtils.newIncrementResult(response, resultFactory, kvFactory),
             scChannel,
             executor,
             backoffPolicy
@@ -248,11 +244,9 @@ public void close() {
 
         @Override
         public CompletableFuture<TxnResult<ByteBuf, ByteBuf>> commit() {
-            return TableRequestProcessor.of(
-                KvUtils.newKvTxnRequest(
-                    scChannel.getStorageContainerId(),
-                    txnBuilder.setHeader(newRoutingHeader(pKey))),
-                response -> KvUtils.newKvTxnResult(response.getKvTxnResp(), resultFactory, kvFactory),
+            return TxnRequestProcessor.of(
+                txnBuilder.setHeader(newRoutingHeader(pKey)).build(),
+                response -> KvUtils.newKvTxnResult(response, resultFactory, kvFactory),
                 scChannel,
                 executor,
                 backoffPolicy
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessor.java
new file mode 100644
index 000000000..38ae6fcf2
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class PutRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<PutRequest, PutResponse, RespT> {
+
+    public static <T> PutRequestProcessor<T> of(
+        PutRequest request,
+        Function<PutResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new PutRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final PutRequest request;
+    private final Function<PutResponse, RespT> responseFunc;
+
+    private PutRequestProcessor(PutRequest request,
+                                Function<PutResponse, RespT> respFunc,
+                                StorageContainerChannel channel,
+                                ScheduledExecutorService executor,
+                                Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected PutRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<PutResponse> sendRPC(StorageServerChannel rsChannel,
+                                                    PutRequest request) {
+        return rsChannel.getTableService().put(request);
+    }
+
+    @Override
+    protected RespT processResponse(PutResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessor.java
new file mode 100644
index 000000000..de2ac87e5
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class RangeRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<RangeRequest, RangeResponse, RespT> {
+
+    public static <T> RangeRequestProcessor<T> of(
+        RangeRequest request,
+        Function<RangeResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new RangeRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final RangeRequest request;
+    private final Function<RangeResponse, RespT> responseFunc;
+
+    private RangeRequestProcessor(RangeRequest request,
+                                  Function<RangeResponse, RespT> respFunc,
+                                  StorageContainerChannel channel,
+                                  ScheduledExecutorService executor,
+                                  Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected RangeRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<RangeResponse> sendRPC(StorageServerChannel rsChannel,
+                                                      RangeRequest request) {
+        return rsChannel.getTableService().range(request);
+    }
+
+    @Override
+    protected RespT processResponse(RangeResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
deleted file mode 100644
index e697d3d49..000000000
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.clients.impl.kv;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Function;
-import org.apache.bookkeeper.clients.exceptions.InternalServerException;
-import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
-import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
-import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
-import org.apache.bookkeeper.common.util.Backoff.Policy;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-
-/**
- * Request Processor processing table request.
- */
-public class TableRequestProcessor<RespT>
-    extends ListenableFutureRpcProcessor<StorageContainerRequest, StorageContainerResponse, RespT> {
-
-    public static <T> TableRequestProcessor<T> of(
-        StorageContainerRequest request,
-        Function<StorageContainerResponse, T> responseFunc,
-        StorageContainerChannel channel,
-        ScheduledExecutorService executor,
-        Policy backoffPolicy) {
-        return new TableRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
-    }
-
-    private final StorageContainerRequest request;
-    private final Function<StorageContainerResponse, RespT> responseFunc;
-
-    private TableRequestProcessor(StorageContainerRequest request,
-                                  Function<StorageContainerResponse, RespT> respFunc,
-                                  StorageContainerChannel channel,
-                                  ScheduledExecutorService executor,
-                                  Policy backoffPolicy) {
-        super(channel, executor, backoffPolicy);
-        this.request = request;
-        this.responseFunc = respFunc;
-    }
-
-    @Override
-    protected StorageContainerRequest createRequest() {
-        return request;
-    }
-
-    @Override
-    protected ListenableFuture<StorageContainerResponse> sendRPC(StorageServerChannel rsChannel,
-                                                                 StorageContainerRequest request) {
-        switch (request.getRequestCase()) {
-            case KV_RANGE_REQ:
-                return rsChannel.getTableService().range(request);
-            case KV_PUT_REQ:
-                return rsChannel.getTableService().put(request);
-            case KV_DELETE_REQ:
-                return rsChannel.getTableService().delete(request);
-            case KV_INCR_REQ:
-                return rsChannel.getTableService().increment(request);
-            case KV_TXN_REQ:
-                return rsChannel.getTableService().txn(request);
-            default:
-                SettableFuture<StorageContainerResponse> respFuture = SettableFuture.create();
-                respFuture.setException(new Exception("Unknown request " + request));
-                return respFuture;
-        }
-    }
-
-    @Override
-    protected RespT processResponse(StorageContainerResponse response) throws Exception {
-        if (StatusCode.SUCCESS == response.getCode()) {
-            return responseFunc.apply(response);
-        }
-        throw new InternalServerException("Encountered internal server exception : code = "
-            + response.getCode());
-    }
-}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessor.java
new file mode 100644
index 000000000..a3ed20925
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class TxnRequestProcessor<RespT>
+    extends ListenableFutureRpcProcessor<TxnRequest, TxnResponse, RespT> {
+
+    public static <T> TxnRequestProcessor<T> of(
+        TxnRequest request,
+        Function<TxnResponse, T> responseFunc,
+        StorageContainerChannel channel,
+        ScheduledExecutorService executor,
+        Policy backoffPolicy) {
+        return new TxnRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+    }
+
+    private final TxnRequest request;
+    private final Function<TxnResponse, RespT> responseFunc;
+
+    private TxnRequestProcessor(TxnRequest request,
+                                Function<TxnResponse, RespT> respFunc,
+                                StorageContainerChannel channel,
+                                ScheduledExecutorService executor,
+                                Policy backoffPolicy) {
+        super(channel, executor, backoffPolicy);
+        this.request = request;
+        this.responseFunc = respFunc;
+    }
+
+    @Override
+    protected TxnRequest createRequest() {
+        return request;
+    }
+
+    @Override
+    protected ListenableFuture<TxnResponse> sendRPC(StorageServerChannel rsChannel,
+                                                    TxnRequest request) {
+        return rsChannel.getTableService().txn(request);
+    }
+
+    @Override
+    protected RespT processResponse(TxnResponse response) throws Exception {
+        if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+            return responseFunc.apply(response);
+        }
+        throw new InternalServerException("Encountered internal server exception : code = "
+            + response.getHeader().getCode());
+    }
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessorTest.java
new file mode 100644
index 000000000..8b01b9b3e
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link DeleteRequestProcessor}.
+ */
+public class DeleteRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected DeleteRangeResponse newSuccessResponse() {
+        return DeleteRangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected DeleteRangeRequest newRequest() {
+        return DeleteRangeRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        DeleteRangeResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void delete(DeleteRangeRequest request,
+                               StreamObserver<DeleteRangeResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<DeleteRangeResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        DeleteRangeRequest request = newRequest();
+
+        DeleteRequestProcessor<String> processor = DeleteRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessorTest.java
new file mode 100644
index 000000000..1b1ec4933
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link IncrementRequestProcessor}.
+ */
+public class IncrementRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected IncrementResponse newSuccessResponse() {
+        return IncrementResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected IncrementRequest newRequest() {
+        return IncrementRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        IncrementResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void increment(IncrementRequest request,
+                                  StreamObserver<IncrementResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<IncrementResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        IncrementRequest request = newRequest();
+
+        IncrementRequestProcessor<String> processor = IncrementRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessorTest.java
new file mode 100644
index 000000000..3481a26b9
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link PutRequestProcessor}.
+ */
+public class PutRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected PutResponse newSuccessResponse() {
+        return PutResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected PutRequest newRequest() {
+        return PutRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        PutResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void put(PutRequest request,
+                            StreamObserver<PutResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<PutResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        PutRequest request = newRequest();
+
+        PutRequestProcessor<String> processor = PutRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessorTest.java
new file mode 100644
index 000000000..9e53df7f3
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link RangeRequestProcessor}.
+ */
+public class RangeRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected RangeResponse newSuccessResponse() {
+        return RangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected RangeRequest newRequest() {
+        return RangeRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        RangeResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void range(RangeRequest request,
+                              StreamObserver<RangeResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<RangeResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        RangeRequest request = newRequest();
+
+        RangeRequestProcessor<String> processor = RangeRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
deleted file mode 100644
index bec76e519..000000000
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.clients.impl.kv;
-
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.stub.StreamObserver;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-import lombok.Cleanup;
-import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
-import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
-import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
-import org.apache.bookkeeper.clients.utils.ClientConstants;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceImplBase;
-import org.junit.Test;
-
-/**
- * Unit test of {@link TableRequestProcessor}.
- */
-public class TableRequestProcessorTest extends GrpcClientTestBase {
-
-    @Override
-    protected void doSetup() throws Exception {
-    }
-
-    @Override
-    protected void doTeardown() throws Exception {
-    }
-
-    @Test
-    public void testProcessRangeRequest() throws Exception {
-        testProcess(KV_RANGE_REQ);
-    }
-
-    @Test
-    public void testProcessPutRequest() throws Exception {
-        testProcess(KV_PUT_REQ);
-    }
-
-    @Test
-    public void testProcessDeleteRequest() throws Exception {
-        testProcess(KV_DELETE_REQ);
-    }
-
-    @Test
-    public void testProcessIncrementRequest() throws Exception {
-        testProcess(KV_INCR_REQ);
-    }
-
-    @Test
-    public void testProcessTxnRequest() throws Exception {
-        testProcess(KV_TXN_REQ);
-    }
-
-    private void testProcess(RequestCase type) throws Exception {
-        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
-
-        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
-
-        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
-        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
-
-        StorageContainerResponse.Builder respBuilder = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS);
-
-        switch (type) {
-            case KV_PUT_REQ:
-                respBuilder.setKvPutResp(PutResponse.newBuilder().build());
-                break;
-            case KV_DELETE_REQ:
-                respBuilder.setKvDeleteResp(DeleteRangeResponse.newBuilder().build());
-                break;
-            case KV_RANGE_REQ:
-                respBuilder.setKvRangeResp(RangeResponse.newBuilder().build());
-                break;
-            case KV_INCR_REQ:
-                respBuilder.setKvIncrResp(IncrementResponse.newBuilder().build());
-                break;
-            case KV_TXN_REQ:
-                respBuilder.setKvTxnResp(TxnResponse.newBuilder().build());
-                break;
-            default:
-                break;
-        }
-        StorageContainerResponse response = respBuilder.build();
-
-        AtomicReference<StorageContainerRequest> receivedRequest = new AtomicReference<>(null);
-        AtomicReference<RequestCase> receivedRequestType = new AtomicReference<>(null);
-        TableServiceImplBase tableService = new TableServiceImplBase() {
-            @Override
-            public void range(StorageContainerRequest request,
-                              StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_RANGE_REQ);
-                complete(responseObserver);
-            }
-
-            @Override
-            public void put(StorageContainerRequest request,
-                            StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_PUT_REQ);
-                complete(responseObserver);
-            }
-
-            @Override
-            public void delete(StorageContainerRequest request,
-                               StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_DELETE_REQ);
-                complete(responseObserver);
-            }
-
-            @Override
-            public void txn(StorageContainerRequest request,
-                            StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_TXN_REQ);
-                complete(responseObserver);
-            }
-
-            @Override
-            public void increment(StorageContainerRequest request,
-                                  StreamObserver<StorageContainerResponse> responseObserver) {
-                receivedRequest.set(request);
-                receivedRequestType.set(KV_INCR_REQ);
-                complete(responseObserver);
-            }
-
-            private void complete(StreamObserver<StorageContainerResponse> responseStreamObserver) {
-                responseStreamObserver.onNext(response);
-                responseStreamObserver.onCompleted();
-            }
-        };
-        serviceRegistry.addService(tableService.bindService());
-        StorageServerChannel ssChannel = new StorageServerChannel(
-            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
-            Optional.empty());
-        serverChannelFuture.complete(ssChannel);
-
-        StorageContainerRequest.Builder requestBuilder = StorageContainerRequest.newBuilder();
-        switch (type) {
-            case KV_PUT_REQ:
-                requestBuilder.setKvPutReq(PutRequest.newBuilder().build());
-                break;
-            case KV_DELETE_REQ:
-                requestBuilder.setKvDeleteReq(DeleteRangeRequest.newBuilder().build());
-                break;
-            case KV_RANGE_REQ:
-                requestBuilder.setKvRangeReq(RangeRequest.newBuilder().build());
-                break;
-            case KV_INCR_REQ:
-                requestBuilder.setKvIncrReq(IncrementRequest.newBuilder().build());
-                break;
-            case KV_TXN_REQ:
-                requestBuilder.setKvTxnReq(TxnRequest.newBuilder().build());
-                break;
-            default:
-                break;
-        }
-        StorageContainerRequest request = requestBuilder.build();
-
-        TableRequestProcessor<String> processor = TableRequestProcessor.of(
-            request,
-            resp -> "test",
-            scChannel,
-            scheduler,
-            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
-        assertEquals("test", FutureUtils.result(processor.process()));
-        assertSame(request, receivedRequest.get());
-        assertEquals(type, receivedRequestType.get());
-    }
-
-}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java
index 3cff0a6c4..d2df33411 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java
@@ -17,16 +17,8 @@
 import static com.google.common.base.Charsets.UTF_8;
 import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newDeleteRequest;
 import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newIncrementRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvDeleteRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvIncrementRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvPutRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvRangeRequest;
 import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newPutRequest;
 import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newRangeRequest;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -45,7 +37,6 @@
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.junit.Test;
 
 /**
@@ -88,26 +79,6 @@ public void testNewRangeRequest() {
         }
     }
 
-    @Test
-    public void testNewKvRangeRequest() {
-        try (RangeOption<ByteBuf> rangeOption = optionFactory.newRangeOption()
-            .endKey(key.retainedDuplicate())
-            .countOnly(true)
-            .keysOnly(true)
-            .limit(10)
-            .maxCreateRev(1234L)
-            .minCreateRev(234L)
-            .maxModRev(2345L)
-            .minModRev(1235L)
-            .build()) {
-            RangeRequest.Builder rrBuilder = newRangeRequest(key, rangeOption);
-            StorageContainerRequest request = newKvRangeRequest(scId, rrBuilder);
-            assertEquals(scId, request.getScId());
-            assertEquals(KV_RANGE_REQ, request.getRequestCase());
-            assertEquals(rrBuilder.build(), request.getKvRangeReq());
-        }
-    }
-
     @Test
     public void testNewPutRequest() {
         try (PutOption<ByteBuf> option = Options.putAndGet()) {
@@ -119,17 +90,6 @@ public void testNewPutRequest() {
         }
     }
 
-    @Test
-    public void testNewKvPutRequest() {
-        try (PutOption<ByteBuf> option = Options.putAndGet()) {
-            PutRequest.Builder putBuilder = newPutRequest(key, value, option);
-            StorageContainerRequest request = newKvPutRequest(scId, putBuilder);
-            assertEquals(scId, request.getScId());
-            assertEquals(KV_PUT_REQ, request.getRequestCase());
-            assertEquals(putBuilder.build(), request.getKvPutReq());
-        }
-    }
-
     @Test
     public void testNewIncrementRequest() {
         try (IncrementOption<ByteBuf> option = Options.incrementAndGet()) {
@@ -141,17 +101,6 @@ public void testNewIncrementRequest() {
         }
     }
 
-    @Test
-    public void testNewKvIncrementRequest() {
-        try (IncrementOption<ByteBuf> option = Options.incrementAndGet()) {
-            IncrementRequest.Builder incrBuilder = newIncrementRequest(key, 100L, option);
-            StorageContainerRequest request = newKvIncrementRequest(scId, incrBuilder);
-            assertEquals(scId, request.getScId());
-            assertEquals(KV_INCR_REQ, request.getRequestCase());
-            assertEquals(incrBuilder.build(), request.getKvIncrReq());
-        }
-    }
-
     @Test
     public void testNewDeleteRequest() {
         try (DeleteOption<ByteBuf> option = optionFactory.newDeleteOption()
@@ -166,18 +115,4 @@ public void testNewDeleteRequest() {
         }
     }
 
-    @Test
-    public void testNewKvDeleteRequest() {
-        try (DeleteOption<ByteBuf> option = optionFactory.newDeleteOption()
-            .endKey(key.retainedDuplicate())
-            .prevKv(true)
-            .build()) {
-            DeleteRangeRequest.Builder delBuilder = newDeleteRequest(key, option);
-            StorageContainerRequest request = newKvDeleteRequest(scId, delBuilder);
-            assertEquals(scId, request.getScId());
-            assertEquals(KV_DELETE_REQ, request.getRequestCase());
-            assertEquals(delBuilder.build(), request.getKvDeleteReq());
-        }
-    }
-
 }
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessorTest.java
new file mode 100644
index 000000000..2efecd0e3
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link TxnRequestProcessor}.
+ */
+public class TxnRequestProcessorTest extends GrpcClientTestBase {
+
+    @Override
+    protected void doSetup() throws Exception {
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    protected TxnResponse newSuccessResponse() {
+        return TxnResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .build())
+            .build();
+    }
+
+    protected TxnRequest newRequest() {
+        return TxnRequest.newBuilder()
+            .build();
+    }
+
+    @Test
+    public void testProcess() throws Exception {
+        @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+        StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+        CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+        when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+        TxnResponse response = newSuccessResponse();
+
+        AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void txn(TxnRequest request,
+                            StreamObserver<TxnResponse> responseObserver) {
+                receivedRequest.set(request);
+                complete(responseObserver);
+            }
+
+            private void complete(StreamObserver<TxnResponse> responseStreamObserver) {
+                responseStreamObserver.onNext(response);
+                responseStreamObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+        StorageServerChannel ssChannel = new StorageServerChannel(
+            InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty());
+        serverChannelFuture.complete(ssChannel);
+
+        TxnRequest request = newRequest();
+
+        TxnRequestProcessor<String> processor = TxnRequestProcessor.of(
+            request,
+            resp -> "test",
+            scChannel,
+            scheduler,
+            ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+        assertEquals("test", FutureUtils.result(processor.process()));
+        assertSame(request, receivedRequest.get());
+    }
+
+}
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java
index 35ed903e0..687388f10 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java
@@ -42,7 +42,6 @@
 import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -224,25 +223,17 @@ public static GetStorageContainerEndpointResponse createGetStorageContainerEndpo
     // Meta Range API
     //
 
-    private static StorageContainerRequest.Builder newScRequestBuilder(long scId) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId);
-    }
-
-    public static StorageContainerRequest createGetActiveRangesRequest(long scId, long streamId) {
-        return newScRequestBuilder(scId)
-            .setGetActiveRangesReq(GetActiveRangesRequest.newBuilder()
-                .setStreamId(streamId))
-            .build();
+    public static GetActiveRangesRequest createGetActiveRangesRequest(long streamId) {
+        return GetActiveRangesRequest.newBuilder()
+                .setStreamId(streamId)
+                .build();
     }
 
-    public static StorageContainerRequest createGetActiveRangesRequest(long scId,
-                                                                       StreamProperties streamProps) {
-        return newScRequestBuilder(scId)
-            .setGetActiveRangesReq(GetActiveRangesRequest.newBuilder()
+    public static GetActiveRangesRequest createGetActiveRangesRequest(StreamProperties streamProps) {
+        return GetActiveRangesRequest.newBuilder()
                 .setStreamId(streamProps.getStreamId())
-                .setStreamProps(streamProps))
-            .build();
+                .setStreamProps(streamProps)
+                .build();
     }
 
     //
diff --git a/stream/proto/src/main/proto/kv_rpc.proto b/stream/proto/src/main/proto/kv_rpc.proto
index c324bcb5b..01e5cb6a5 100644
--- a/stream/proto/src/main/proto/kv_rpc.proto
+++ b/stream/proto/src/main/proto/kv_rpc.proto
@@ -18,6 +18,7 @@
 syntax = "proto3";
 
 import "kv.proto";
+import "storage.proto";
 
 package bookkeeper.proto.kv.rpc;
 
@@ -34,7 +35,7 @@ message RoutingHeader {
   bytes r_key           = 3;
 }
 
-service KV {
+service TableService {
   // Range gets the keys in the range from the key-value store.
   // NOT supported yet.
   rpc Range(RangeRequest) returns (RangeResponse) {}
@@ -47,7 +48,7 @@ service KV {
   // DeleteRange deletes the given range from the key-value store.
   // A delete request increments the revision of the key-value store
   // and generates a delete event in the event history for every deleted key.
-  rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {}
+  rpc Delete(DeleteRangeRequest) returns (DeleteRangeResponse) {}
 
   // Txn processes multiple requests in a single transaction.
   // A txn request increments the revision of the key-value store
@@ -55,20 +56,17 @@ service KV {
   // It is not allowed to modify the same key several times within one txn.
   rpc Txn(TxnRequest) returns (TxnResponse) {}
 
+  // Increment increments the amount associated with the keys
+  rpc Increment(IncrementRequest) returns (IncrementResponse) {}
+
 }
 
 message ResponseHeader {
-  // cluster_id is the ID of the cluster which sent the response.
-  uint64 cluster_id = 1;
-  // member_id is the ID of the member which sent the response.
-  uint64 member_id = 2;
-  // revision is the key-value store revision when the request was applied.
-  int64 revision = 3;
-  // raft_term is the raft term when the request was applied.
-  uint64 raft_term = 4;
+  // Status Code
+  storage.StatusCode code               = 1;
 
   // routing header
-  RoutingHeader routing_header = 99;
+  RoutingHeader routing_header  = 99;
 }
 
 message RangeRequest {
diff --git a/stream/proto/src/main/proto/storage.proto b/stream/proto/src/main/proto/storage.proto
index b84ec2edf..7570d29f5 100644
--- a/stream/proto/src/main/proto/storage.proto
+++ b/stream/proto/src/main/proto/storage.proto
@@ -19,7 +19,6 @@ syntax = "proto3";
 
 import "common.proto";
 import "stream.proto";
-import "kv_rpc.proto";
 
 package bookkeeper.proto.storage;
 
@@ -87,7 +86,8 @@ message GetActiveRangesRequest {
 }
 
 message GetActiveRangesResponse {
-    repeated RelatedRanges ranges       = 1;
+    StatusCode code                     = 1;
+    repeated RelatedRanges ranges       = 2;
 }
 
 enum RelationType {
@@ -104,7 +104,7 @@ message RelatedRanges {
 
 // public service for other operations in range server
 service MetaRangeService {
-    rpc GetActiveRanges(StorageContainerRequest)        returns (StorageContainerResponse);
+    rpc GetActiveRanges(GetActiveRangesRequest)        returns (GetActiveRangesResponse);
 }
 
 //
@@ -187,18 +187,6 @@ service RootRangeService {
     rpc GetStream(GetStreamRequest) returns (GetStreamResponse);
 }
 
-//
-// KV Service
-//
-
-service TableService {
-    rpc Range(StorageContainerRequest) returns (StorageContainerResponse) {}
-    rpc Put(StorageContainerRequest) returns (StorageContainerResponse) {}
-    rpc Delete(StorageContainerRequest) returns (StorageContainerResponse) {}
-    rpc Increment(StorageContainerRequest) returns (StorageContainerResponse) {}
-    rpc Txn(StorageContainerRequest) returns (StorageContainerResponse) {}
-}
-
 //
 // StorageContainerService
 //
@@ -234,35 +222,3 @@ service StorageContainerService {
     // Get the storage container endpoints
     rpc GetStorageContainerEndpoint(GetStorageContainerEndpointRequest) returns (GetStorageContainerEndpointResponse);
 }
-
-message StorageContainerRequest {
-    int64 sc_id                 = 1;
-
-    oneof request {
-        // stream metadata operations
-        GetActiveRangesRequest get_active_ranges_req            = 200;
-
-        // kv operations
-        kv.rpc.RangeRequest kv_range_req                        = 400;
-        kv.rpc.PutRequest kv_put_req                            = 401;
-        kv.rpc.DeleteRangeRequest kv_delete_req                 = 402;
-        kv.rpc.TxnRequest kv_txn_req                            = 403;
-        kv.rpc.IncrementRequest kv_incr_req                     = 404;
-    }
-}
-
-message StorageContainerResponse {
-    StatusCode code             = 1;
-
-    oneof response {
-        // stream metadata operations
-        GetActiveRangesResponse get_active_ranges_resp          = 200;
-
-        // kv operations
-        kv.rpc.RangeResponse kv_range_resp                      = 400;
-        kv.rpc.PutResponse kv_put_resp                          = 401;
-        kv.rpc.DeleteRangeResponse kv_delete_resp               = 402;
-        kv.rpc.TxnResponse kv_txn_resp                          = 403;
-        kv.rpc.IncrementResponse kv_incr_resp                   = 404;
-    }
-}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java
index 1eaac62c8..a584a37ad 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java
@@ -18,22 +18,30 @@
 package org.apache.bookkeeper.stream.storage.api.kv;
 
 import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 
 /**
  * The table store that stores and serves tables.
  */
 public interface TableStore {
 
-    CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request);
+    CompletableFuture<RangeResponse> range(RangeRequest request);
 
-    CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request);
+    CompletableFuture<PutResponse> put(PutRequest request);
 
-    CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request);
+    CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest request);
 
-    CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request);
+    CompletableFuture<TxnResponse> txn(TxnRequest request);
 
-    CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request);
+    CompletableFuture<IncrementResponse> incr(IncrementRequest request);
 
 }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java
index 4215dd5bf..7ef664d01 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java
@@ -19,8 +19,8 @@
 package org.apache.bookkeeper.stream.storage.api.metadata;
 
 import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 
 /**
  * The metadata store that store ranges.
@@ -33,6 +33,6 @@
      * @param request the request
      * @return the active ranges
      */
-    CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request);
+    CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request);
 
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
index 85b69362a..1fb0e448a 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
@@ -19,11 +19,12 @@
 
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
-import org.apache.bookkeeper.stream.storage.impl.grpc.handler.StorageContainerResponseHandler;
+import org.apache.bookkeeper.stream.storage.impl.grpc.handler.ResponseHandler;
 
 /**
  * The gRPC protocol based range service.
@@ -43,10 +44,17 @@ public GrpcMetaRangeService(RangeStoreService service) {
     //
 
     @Override
-    public void getActiveRanges(StorageContainerRequest request,
-                                StreamObserver<StorageContainerResponse> responseObserver) {
+    public void getActiveRanges(GetActiveRangesRequest request,
+                                StreamObserver<GetActiveRangesResponse> responseObserver) {
         rangeStore.getActiveRanges(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<GetActiveRangesResponse>(responseObserver) {
+                @Override
+                protected GetActiveRangesResponse createErrorResp(Throwable cause) {
+                    return GetActiveRangesResponse.newBuilder()
+                        .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                        .build();
+                }
+            });
     }
 
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
index 39c30cb98..f26a7bd96 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
@@ -19,11 +19,21 @@
 
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
-import org.apache.bookkeeper.stream.storage.impl.grpc.handler.StorageContainerResponseHandler;
+import org.apache.bookkeeper.stream.storage.impl.grpc.handler.ResponseHandler;
 
 /**
  * The gRPC protocol based k/v service.
@@ -39,35 +49,87 @@ public GrpcTableService(RangeStoreService store) {
     }
 
     @Override
-    public void range(StorageContainerRequest request,
-                      StreamObserver<StorageContainerResponse> responseObserver) {
+    public void range(RangeRequest request,
+                      StreamObserver<RangeResponse> responseObserver) {
         rangeStore.range(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<RangeResponse>(responseObserver) {
+                @Override
+                protected RangeResponse createErrorResp(Throwable cause) {
+                    return RangeResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 
     @Override
-    public void put(StorageContainerRequest request,
-                    StreamObserver<StorageContainerResponse> responseObserver) {
+    public void put(PutRequest request,
+                    StreamObserver<PutResponse> responseObserver) {
         rangeStore.put(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<PutResponse>(responseObserver) {
+                @Override
+                protected PutResponse createErrorResp(Throwable cause) {
+                    return PutResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 
     @Override
-    public void delete(StorageContainerRequest request,
-                       StreamObserver<StorageContainerResponse> responseObserver) {
+    public void delete(DeleteRangeRequest request,
+                       StreamObserver<DeleteRangeResponse> responseObserver) {
         rangeStore.delete(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<DeleteRangeResponse>(responseObserver) {
+                @Override
+                protected DeleteRangeResponse createErrorResp(Throwable cause) {
+                    return DeleteRangeResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 
     @Override
-    public void txn(StorageContainerRequest request, StreamObserver<StorageContainerResponse> responseObserver) {
+    public void txn(TxnRequest request,
+                    StreamObserver<TxnResponse> responseObserver) {
         rangeStore.txn(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<TxnResponse>(responseObserver) {
+                @Override
+                protected TxnResponse createErrorResp(Throwable cause) {
+                    return TxnResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 
     @Override
-    public void increment(StorageContainerRequest request, StreamObserver<StorageContainerResponse> responseObserver) {
+    public void increment(IncrementRequest request,
+                          StreamObserver<IncrementResponse> responseObserver) {
         rangeStore.incr(request).whenComplete(
-            StorageContainerResponseHandler.of(responseObserver));
+            new ResponseHandler<IncrementResponse>(responseObserver) {
+                @Override
+                protected IncrementResponse createErrorResp(Throwable cause) {
+                    return IncrementResponse.newBuilder()
+                        .setHeader(ResponseHeader.newBuilder()
+                            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                            .setRoutingHeader(request.getHeader())
+                            .build())
+                        .build();
+                }
+            });
     }
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java
deleted file mode 100644
index 42c21a407..000000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-
-/**
- * Response handler to handle storage container response.
- */
-public class StorageContainerResponseHandler extends ResponseHandler<StorageContainerResponse> {
-
-    public static StorageContainerResponseHandler of(StreamObserver<StorageContainerResponse> respObserver) {
-        return new StorageContainerResponseHandler(respObserver);
-    }
-
-    private StorageContainerResponseHandler(StreamObserver<StorageContainerResponse> respObserver) {
-        super(respObserver);
-    }
-
-    @Override
-    protected StorageContainerResponse createErrorResp(Throwable cause) {
-        return StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
-            .build();
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java
index d7ef678a7..9a9670ae0 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java
@@ -19,7 +19,6 @@
 
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.fromProtoCompare;
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.handleCause;
-import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.mvccCodeToStatusCode;
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.newStoreKey;
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.processDeleteResult;
 import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.processIncrementResult;
@@ -46,17 +45,18 @@
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
 import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
 
 /**
@@ -72,13 +72,11 @@ public TableStoreImpl(MVCCAsyncStore<byte[], byte[]> store) {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
-        RangeRequest rangeReq = request.getKvRangeReq();
-
+    public CompletableFuture<RangeResponse> range(RangeRequest rangeReq) {
         if (log.isTraceEnabled()) {
             log.trace("Received range request {}", rangeReq);
         }
-        return range(rangeReq)
+        return doRange(rangeReq)
             .thenApply(result -> {
                 try {
                     RangeResponse rangeResp = processRangeResult(
@@ -89,19 +87,18 @@ public TableStoreImpl(MVCCAsyncStore<byte[], byte[]> store) {
                     result.close();
                 }
             })
-            .thenApply(rangeResp -> StorageContainerResponse.newBuilder()
-                .setCode(StatusCode.SUCCESS)
-                .setKvRangeResp(rangeResp)
-                .build())
             .exceptionally(cause -> {
                 log.error("Failed to process range request {}", rangeReq, cause);
-                return StorageContainerResponse.newBuilder()
-                    .setCode(handleCause(cause))
+                return RangeResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(handleCause(cause))
+                        .setRoutingHeader(rangeReq.getHeader())
+                        .build())
                     .build();
             });
     }
 
-    private CompletableFuture<RangeResult<byte[], byte[]>> range(RangeRequest request) {
+    private CompletableFuture<RangeResult<byte[], byte[]>> doRange(RangeRequest request) {
         RangeOp<byte[], byte[]> op = buildRangeOp(request.getHeader(), request);
         return store.range(op)
             .whenComplete((rangeResult, throwable) -> op.close());
@@ -143,10 +140,8 @@ public TableStoreImpl(MVCCAsyncStore<byte[], byte[]> store) {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
-        PutRequest putReq = request.getKvPutReq();
-
-        return put(putReq)
+    public CompletableFuture<PutResponse> put(PutRequest putReq) {
+        return doPut(putReq)
             .thenApply(result -> {
                 try {
                     return processPutResult(
@@ -156,19 +151,18 @@ public TableStoreImpl(MVCCAsyncStore<byte[], byte[]> store) {
                     result.close();
                 }
             })
-            .thenApply(putResp -> StorageContainerResponse.newBuilder()
-                .setCode(StatusCode.SUCCESS)
-                .setKvPutResp(putResp)
-                .build())
             .exceptionally(cause -> {
                 log.error("Failed to process put request {}", putReq, cause);
-                return StorageContainerResponse.newBuilder()
-                    .setCode(handleCause(cause))
+                return PutResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(handleCause(cause))
+                        .setRoutingHeader(putReq.getHeader())
+                        .build())
                     .build();
             });
     }
 
-    private CompletableFuture<PutResult<byte[], byte[]>> put(PutRequest request) {
+    private CompletableFuture<PutResult<byte[], byte[]>> doPut(PutRequest request) {
         PutOp<byte[], byte[]> op = buildPutOp(request.getHeader(), request);
         return store.put(op)
             .whenComplete((putResult, throwable) -> op.close());
@@ -187,10 +181,8 @@ public TableStoreImpl(MVCCAsyncStore<byte[], byte[]> store) {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
-        IncrementRequest incrementReq = request.getKvIncrReq();
-
-        return increment(incrementReq)
+    public CompletableFuture<IncrementResponse> incr(IncrementRequest incrementReq) {
+        return doIncrement(incrementReq)
             .thenApply(result -> {
                 try {
                     return processIncrementResult(
@@ -200,19 +192,18 @@ public TableStoreImpl(MVCCAsyncStore<byte[], byte[]> store) {
                     result.close();
                 }
             })
-            .thenApply(incrementResp -> StorageContainerResponse.newBuilder()
-                .setCode(StatusCode.SUCCESS)
-                .setKvIncrResp(incrementResp)
-                .build())
             .exceptionally(cause -> {
                 log.error("Failed to process increment request {}", incrementReq, cause);
-                return StorageContainerResponse.newBuilder()
-                    .setCode(handleCause(cause))
+                return IncrementResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(handleCause(cause))
+                        .setRoutingHeader(incrementReq.getHeader())
+                        .build())
                     .build();
             });
     }
 
-    private CompletableFuture<IncrementResult<byte[], byte[]>> increment(IncrementRequest request) {
+    private CompletableFuture<IncrementResult<byte[], byte[]>> doIncrement(IncrementRequest request) {
         IncrementOp<byte[], byte[]> op = buildIncrementOp(request.getHeader(), request);
         return store.increment(op)
             .whenComplete((incrementResult, throwable) -> op.close());
@@ -231,10 +222,8 @@ public TableStoreImpl(MVCCAsyncStore<byte[], byte[]> store) {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
-        DeleteRangeRequest deleteReq = request.getKvDeleteReq();
-
-        return delete(deleteReq)
+    public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest deleteReq) {
+        return doDelete(deleteReq)
             .thenApply(result -> {
                 try {
                     return processDeleteResult(
@@ -244,16 +233,15 @@ public TableStoreImpl(MVCCAsyncStore<byte[], byte[]> store) {
                     result.close();
                 }
             })
-            .thenApply(deleteResp -> StorageContainerResponse.newBuilder()
-                .setCode(StatusCode.SUCCESS)
-                .setKvDeleteResp(deleteResp)
-                .build())
-            .exceptionally(cause -> StorageContainerResponse.newBuilder()
-                .setCode(handleCause(cause))
+            .exceptionally(cause -> DeleteRangeResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder()
+                    .setCode(handleCause(cause))
+                    .setRoutingHeader(deleteReq.getHeader())
+                    .build())
                 .build());
     }
 
-    private CompletableFuture<DeleteResult<byte[], byte[]>> delete(DeleteRangeRequest request) {
+    private CompletableFuture<DeleteResult<byte[], byte[]>> doDelete(DeleteRangeRequest request) {
         DeleteOp<byte[], byte[]> op = buildDeleteOp(request.getHeader(), request);
         return store.delete(op)
             .whenComplete((deleteResult, throwable) -> op.close());
@@ -277,30 +265,27 @@ public TableStoreImpl(MVCCAsyncStore<byte[], byte[]> store) {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
-        TxnRequest txnReq = request.getKvTxnReq();
-
+    public CompletableFuture<TxnResponse> txn(TxnRequest txnReq) {
         if (log.isTraceEnabled()) {
             log.trace("Received txn request : {}", txnReq);
         }
-        return txn(txnReq)
+        return doTxn(txnReq)
             .thenApply(txnResult -> {
                 try {
-                    TxnResponse txnResponse = processTxnResult(txnReq.getHeader(), txnResult);
-                    return StorageContainerResponse.newBuilder()
-                        .setCode(mvccCodeToStatusCode(txnResult.code()))
-                        .setKvTxnResp(txnResponse)
-                        .build();
+                    return processTxnResult(txnReq.getHeader(), txnResult);
                 } finally {
                     txnResult.close();
                 }
             })
-            .exceptionally(cause -> StorageContainerResponse.newBuilder()
-                .setCode(handleCause(cause))
+            .exceptionally(cause -> TxnResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder()
+                    .setCode(handleCause(cause))
+                    .setRoutingHeader(txnReq.getHeader())
+                    .build())
                 .build());
     }
 
-    private CompletableFuture<TxnResult<byte[], byte[]>> txn(TxnRequest request) {
+    private CompletableFuture<TxnResult<byte[], byte[]>> doTxn(TxnRequest request) {
         TxnOp<byte[], byte[]> op = buildTxnOp(request);
         return store.txn(op)
             .whenComplete((txnResult, throwable) -> op.close());
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java
index a9a8dcec9..964ca621a 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java
@@ -158,6 +158,7 @@ static PutResponse processPutResult(RoutingHeader routingHeader,
         ByteString rKey = routingHeader.getRKey();
         PutResponse.Builder putRespBuilder = PutResponse.newBuilder()
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(result.code()))
                 .setRoutingHeader(routingHeader)
                 .build());
         if (null != result.prevKv()) {
@@ -170,6 +171,7 @@ static IncrementResponse processIncrementResult(RoutingHeader routingHeader,
                                                     IncrementResult<byte[], byte[]> result) {
         IncrementResponse.Builder putRespBuilder = IncrementResponse.newBuilder()
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(result.code()))
                 .setRoutingHeader(routingHeader)
                 .build())
             .setTotalAmount(result.totalAmount());
@@ -182,6 +184,7 @@ static RangeResponse processRangeResult(RoutingHeader routingHeader,
         return RangeResponse.newBuilder()
             .setCount(result.count())
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(result.code()))
                 .setRoutingHeader(routingHeader)
                 .build())
             .addAllKvs(Lists.transform(result.kvs(), kv -> newKeyValue(rKey, kv)))
@@ -194,6 +197,7 @@ static DeleteRangeResponse processDeleteResult(RoutingHeader routingHeader,
         ByteString rKey = routingHeader.getRKey();
         return DeleteRangeResponse.newBuilder()
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(result.code()))
                 .setRoutingHeader(routingHeader)
                 .build())
             .setDeleted(result.numDeleted())
@@ -205,6 +209,7 @@ static TxnResponse processTxnResult(RoutingHeader routingHeader,
                                         TxnResult<byte[], byte[]> txnResult) {
         return TxnResponse.newBuilder()
             .setHeader(ResponseHeader.newBuilder()
+                .setCode(mvccCodeToStatusCode(txnResult.code()))
                 .setRoutingHeader(routingHeader)
                 .build())
             .setSucceeded(txnResult.isSuccess())
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
index 62502a252..530d4cdce 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
@@ -18,9 +18,6 @@
 
 package org.apache.bookkeeper.stream.storage.impl.metadata;
 
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.GET_ACTIVE_RANGES_REQ;
-
 import com.google.common.collect.Maps;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -30,12 +27,11 @@
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
 import org.apache.bookkeeper.stream.proto.RangeMetadata;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
 import org.apache.bookkeeper.stream.proto.storage.RelationType;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
 import org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange;
@@ -70,11 +66,11 @@ public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
     // Stream API
     //
 
-    private CompletableFuture<StorageContainerResponse> createStreamIfMissing(long streamId,
-                                                                              MetaRangeImpl metaRange,
-                                                                              StreamProperties streamProps) {
+    private CompletableFuture<GetActiveRangesResponse> createStreamIfMissing(long streamId,
+                                                                             MetaRangeImpl metaRange,
+                                                                             StreamProperties streamProps) {
         if (null == streamProps) {
-            return FutureUtils.value(StorageContainerResponse.newBuilder()
+            return FutureUtils.value(GetActiveRangesResponse.newBuilder()
                 .setCode(StatusCode.STREAM_NOT_FOUND)
                 .build());
         }
@@ -86,7 +82,7 @@ public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
                 }
                 return getActiveRanges(metaRange);
             } else {
-                return FutureUtils.value(StorageContainerResponse.newBuilder()
+                return FutureUtils.value(GetActiveRangesResponse.newBuilder()
                     .setCode(StatusCode.INTERNAL_SERVER_ERROR)
                     .build());
             }
@@ -94,11 +90,8 @@ public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
-        checkState(
-            GET_ACTIVE_RANGES_REQ == request.getRequestCase(),
-            "Wrong request type: %s", request.getRequestCase());
-        final long streamId = request.getGetActiveRangesReq().getStreamId();
+    public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
+        final long streamId = request.getStreamId();
 
         MetaRangeImpl metaRange = streams.get(streamId);
 
@@ -107,8 +100,8 @@ public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
             return metaRangeImpl.load(streamId)
                 .thenCompose(mr -> {
                     if (null == mr) {
-                        StreamProperties streamProps = request.getGetActiveRangesReq().hasStreamProps()
-                            ? request.getGetActiveRangesReq().getStreamProps() : null;
+                        StreamProperties streamProps = request.hasStreamProps()
+                            ? request.getStreamProps() : null;
                         return createStreamIfMissing(streamId, metaRangeImpl, streamProps);
                     } else {
                         synchronized (streams) {
@@ -122,8 +115,7 @@ public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
         }
     }
 
-    private CompletableFuture<StorageContainerResponse> getActiveRanges(MetaRange metaRange) {
-        StorageContainerResponse.Builder scBuilder = StorageContainerResponse.newBuilder();
+    private CompletableFuture<GetActiveRangesResponse> getActiveRanges(MetaRange metaRange) {
         GetActiveRangesResponse.Builder respBuilder = GetActiveRangesResponse.newBuilder();
         return metaRange.getActiveRanges()
             .thenApplyAsync(ranges -> {
@@ -134,12 +126,11 @@ public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
                         .addAllRelatedRanges(range.getParentsList());
                     respBuilder.addRanges(rrBuilder);
                 }
-                return scBuilder
+                return respBuilder
                     .setCode(StatusCode.SUCCESS)
-                    .setGetActiveRangesResp(respBuilder)
                     .build();
             }, executor)
-            .exceptionally(cause -> scBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
+            .exceptionally(cause -> respBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
     }
 
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
index e15b808f0..4a1be5564 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
@@ -18,13 +18,21 @@
 
 package org.apache.bookkeeper.stream.storage.impl.service;
 
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
-
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -33,12 +41,12 @@
 import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 
 /**
@@ -56,11 +64,10 @@ private FailRequestRangeStoreService(OrderedScheduler scheduler) {
         this.scheduler = scheduler;
     }
 
-    private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
+    private <T> CompletableFuture<T> failWrongGroupRequest() {
         CompletableFuture<T> future = FutureUtils.createFuture();
-        scheduler.executeOrdered(scId, () -> {
-            future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
-        });
+        scheduler.execute(() ->
+            future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND)));
         return future;
     }
 
@@ -80,17 +87,17 @@ private FailRequestRangeStoreService(OrderedScheduler scheduler) {
 
     @Override
     public CompletableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     @Override
     public CompletableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     @Override
     public CompletableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     //
@@ -99,17 +106,17 @@ private FailRequestRangeStoreService(OrderedScheduler scheduler) {
 
     @Override
     public CompletableFuture<CreateStreamResponse> createStream(CreateStreamRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     @Override
     public CompletableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     @Override
     public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
-        return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+        return failWrongGroupRequest();
     }
 
     //
@@ -117,8 +124,8 @@ private FailRequestRangeStoreService(OrderedScheduler scheduler) {
     //
 
     @Override
-    public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
+        return failWrongGroupRequest();
     }
 
     //
@@ -127,27 +134,27 @@ private FailRequestRangeStoreService(OrderedScheduler scheduler) {
 
 
     @Override
-    public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<RangeResponse> range(RangeRequest request) {
+        return failWrongGroupRequest();
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<PutResponse> put(PutRequest request) {
+        return failWrongGroupRequest();
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest request) {
+        return failWrongGroupRequest();
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<TxnResponse> txn(TxnRequest request) {
+        return failWrongGroupRequest();
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
-        return failWrongGroupRequest(request.getScId());
+    public CompletableFuture<IncrementResponse> incr(IncrementRequest request) {
+        return failWrongGroupRequest();
     }
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index 0304fc89e..7adcefdb7 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -18,12 +18,6 @@
 
 package org.apache.bookkeeper.stream.storage.impl.service;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_RANGE_ID;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_STREAM_ID;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_RANGE_ID;
@@ -40,11 +34,16 @@
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -53,12 +52,12 @@
 import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.protocol.RangeId;
 import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
@@ -232,7 +231,7 @@ public void close() {
     //
 
     @Override
-    public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
+    public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
         return mgStore.getActiveRanges(request);
     }
 
@@ -242,12 +241,8 @@ public void close() {
 
 
     @Override
-    public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
-        checkArgument(KV_RANGE_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        RangeRequest rr = request.getKvRangeReq();
-        RoutingHeader header = rr.getHeader();
+    public CompletableFuture<RangeResponse> range(RangeRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
@@ -260,12 +255,8 @@ public void close() {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
-        checkArgument(KV_PUT_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        PutRequest rr = request.getKvPutReq();
-        RoutingHeader header = rr.getHeader();
+    public CompletableFuture<PutResponse> put(PutRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
@@ -278,12 +269,8 @@ public void close() {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
-        checkArgument(KV_DELETE_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        DeleteRangeRequest rr = request.getKvDeleteReq();
-        RoutingHeader header = rr.getHeader();
+    public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
@@ -296,12 +283,8 @@ public void close() {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
-        checkArgument(KV_TXN_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        TxnRequest rr = request.getKvTxnReq();
-        RoutingHeader header = rr.getHeader();
+    public CompletableFuture<TxnResponse> txn(TxnRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
@@ -314,12 +297,8 @@ public void close() {
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
-        checkArgument(KV_INCR_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        IncrementRequest ir = request.getKvIncrReq();
-        RoutingHeader header = ir.getHeader();
+    public CompletableFuture<IncrementResponse> incr(IncrementRequest request) {
+        RoutingHeader header = request.getHeader();
 
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
index a269466d1..929311718 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
@@ -16,7 +16,6 @@
 
 import static org.apache.bookkeeper.common.util.ListenableFutures.fromListenableFuture;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateNamespaceRequest;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateStreamRequest;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createDeleteNamespaceRequest;
@@ -46,6 +45,7 @@
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -64,6 +64,8 @@
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -72,6 +74,8 @@
 import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
@@ -81,10 +85,6 @@
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceFutureStub;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -140,6 +140,7 @@ private static Endpoint createEndpoint(String hostname,
     private TableServiceFutureStub tableService;
     private RootRangeServiceFutureStub rootRangeService;
     private MetaRangeServiceFutureStub metaRangeService;
+    private long scId;
 
     //
     // Utils for table api
@@ -156,64 +157,52 @@ private static Endpoint createEndpoint(String hostname,
         .setRoutingHeader(TEST_ROUTING_HEADER)
         .build();
 
-    private static StorageContainerRequest createPutRequest(long scId) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvPutReq(PutRequest.newBuilder()
+    private static PutRequest createPutRequest() {
+        return PutRequest.newBuilder()
                 .setHeader(TEST_ROUTING_HEADER)
                 .setKey(TEST_KEY)
                 .setValue(TEST_VAL)
-                .build())
-            .build();
+                .build();
     }
 
-    private static StorageContainerResponse createPutResponse(StatusCode code) {
-        return StorageContainerResponse.newBuilder()
-            .setCode(code)
-            .setKvPutResp(PutResponse.newBuilder()
-                .setHeader(TEST_RESP_HEADER)
-                .build())
-            .build();
+    private static PutResponse createPutResponse(StatusCode code) {
+        return PutResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER)
+                    .setCode(code)
+                    .build())
+                .build();
     }
 
-    private static StorageContainerRequest createRangeRequest(long scId) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvRangeReq(RangeRequest.newBuilder()
+    private static RangeRequest createRangeRequest() {
+        return RangeRequest.newBuilder()
                 .setHeader(TEST_ROUTING_HEADER)
                 .setKey(TEST_KEY)
-                .build())
-            .build();
+                .build();
     }
 
-    private static StorageContainerResponse createRangeResponse(StatusCode code) {
-        return StorageContainerResponse.newBuilder()
-            .setCode(code)
-            .setKvRangeResp(RangeResponse.newBuilder()
-                .setHeader(TEST_RESP_HEADER)
+    private static RangeResponse createRangeResponse(StatusCode code) {
+        return RangeResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER)
+                    .setCode(code)
+                    .build())
                 .setCount(0)
-                .build())
-            .build();
+                .build();
     }
 
-    private static StorageContainerRequest createDeleteRequest(long scId) {
-        return StorageContainerRequest.newBuilder()
-            .setScId(scId)
-            .setKvDeleteReq(DeleteRangeRequest.newBuilder()
+    private static DeleteRangeRequest createDeleteRequest() {
+        return DeleteRangeRequest.newBuilder()
                 .setHeader(TEST_ROUTING_HEADER)
                 .setKey(TEST_KEY)
-                .build())
-            .build();
+                .build();
     }
 
-    private static StorageContainerResponse createDeleteResponse(StatusCode code) {
-        return StorageContainerResponse.newBuilder()
-            .setCode(code)
-            .setKvDeleteResp(DeleteRangeResponse.newBuilder()
-                .setHeader(TEST_RESP_HEADER)
+    private static DeleteRangeResponse createDeleteResponse(StatusCode code) {
+        return DeleteRangeResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER)
+                    .setCode(code)
+                    .build())
                 .setDeleted(0)
-                .build())
-            .build();
+                .build();
     }
 
     @SuppressWarnings("unchecked")
@@ -263,10 +252,12 @@ public void setUp() throws Exception {
             .usePlaintext()
             .build();
 
+        scId = ThreadLocalRandom.current().nextInt(2);
+
         // intercept the channel with storage container information.
         channel = ClientInterceptors.intercept(
             channel,
-            new StorageContainerClientInterceptor(0L));
+            new StorageContainerClientInterceptor(scId));
 
 
         tableService = TableServiceGrpc.newFutureStub(channel);
@@ -307,7 +298,7 @@ public void tearDown() {
 
     @Test
     public void testCreateNamespaceNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-create-namespace-no-root-storage-container-store";
         verifyNotFoundException(fromListenableFuture(
@@ -317,7 +308,7 @@ public void testCreateNamespaceNoRootStorageContainerStore() throws Exception {
 
     @Test
     public void testDeleteNamespaceNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-delete-namespace-no-root-storage-container-store";
         verifyNotFoundException(fromListenableFuture(
@@ -327,7 +318,7 @@ public void testDeleteNamespaceNoRootStorageContainerStore() throws Exception {
 
     @Test
     public void testGetNamespaceNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-get-namespace-no-root-storage-container-store";
         verifyNotFoundException(fromListenableFuture(
@@ -395,7 +386,7 @@ public void testGetNamespaceMockRootStorageContainerStore() throws Exception {
 
     @Test
     public void testCreateStreamNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-create-namespace-no-root-storage-container-store";
         String streamName = colName;
@@ -406,7 +397,7 @@ public void testCreateStreamNoRootStorageContainerStore() throws Exception {
 
     @Test
     public void testDeleteStreamNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-delete-namespace-no-root-storage-container-store";
         String streamName = colName;
@@ -417,7 +408,7 @@ public void testDeleteStreamNoRootStorageContainerStore() throws Exception {
 
     @Test
     public void testGetStreamNoRootStorageContainerStore() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         String colName = "test-get-namespace-no-root-storage-container-store";
         String streamName = colName;
@@ -482,26 +473,24 @@ public void testGetStreamMockRootStorageContainerStore() throws Exception {
 
     @Test
     public void testGetActiveRangesNoManager() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         verifyNotFoundException(fromListenableFuture(
-            metaRangeService.getActiveRanges(createGetActiveRangesRequest(12L, 34L))),
+            metaRangeService.getActiveRanges(createGetActiveRangesRequest(34L))),
             Status.NOT_FOUND);
     }
 
     @Test
     public void testGetActiveRangesMockManager() throws Exception {
-        long scId = System.currentTimeMillis();
-
-        StorageContainerResponse resp = StorageContainerResponse.newBuilder()
+        GetActiveRangesResponse resp = GetActiveRangesResponse.newBuilder()
             .setCode(StatusCode.STREAM_NOT_FOUND)
             .build();
-        StorageContainerRequest request = createGetActiveRangesRequest(scId, 34L);
+        GetActiveRangesRequest request = createGetActiveRangesRequest(34L);
 
         when(mockRangeStoreService.getActiveRanges(request))
             .thenReturn(CompletableFuture.completedFuture(resp));
 
-        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+        CompletableFuture<GetActiveRangesResponse> future = fromListenableFuture(
             metaRangeService.getActiveRanges(request));
         verify(mockRangeStoreService, times(1)).getActiveRanges(request);
         assertTrue(resp == future.get());
@@ -514,40 +503,40 @@ public void testGetActiveRangesMockManager() throws Exception {
 
     @Test
     public void testPutNoStorageContainer() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         verifyNotFoundException(fromListenableFuture(
-            tableService.put(createPutRequest(ROOT_STORAGE_CONTAINER_ID))),
+            tableService.put(createPutRequest())),
             Status.NOT_FOUND);
     }
 
     @Test
     public void testDeleteNoStorageContainer() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         verifyNotFoundException(fromListenableFuture(
-            tableService.delete(createDeleteRequest(ROOT_STORAGE_CONTAINER_ID))),
+            tableService.delete(createDeleteRequest())),
             Status.NOT_FOUND);
     }
 
     @Test
     public void testRangeNoStorageContainer() throws Exception {
-        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+        rangeStore.getRegistry().stopStorageContainer(scId).join();
 
         verifyNotFoundException(fromListenableFuture(
-            tableService.range(createRangeRequest(ROOT_STORAGE_CONTAINER_ID))),
+            tableService.range(createRangeRequest())),
             Status.NOT_FOUND);
     }
 
     @Test
     public void testRangeMockStorageContainer() throws Exception {
-        StorageContainerResponse response = createRangeResponse(StatusCode.SUCCESS);
-        StorageContainerRequest request = createRangeRequest(ROOT_STORAGE_CONTAINER_ID);
+        RangeResponse response = createRangeResponse(StatusCode.SUCCESS);
+        RangeRequest request = createRangeRequest();
 
         when(mockRangeStoreService.range(request))
             .thenReturn(CompletableFuture.completedFuture(response));
 
-        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+        CompletableFuture<RangeResponse> future = fromListenableFuture(
             tableService.range(request));
         verify(mockRangeStoreService, times(1)).range(eq(request));
         assertTrue(response == future.get());
@@ -555,13 +544,13 @@ public void testRangeMockStorageContainer() throws Exception {
 
     @Test
     public void testDeleteMockStorageContainer() throws Exception {
-        StorageContainerResponse response = createDeleteResponse(StatusCode.SUCCESS);
-        StorageContainerRequest request = createDeleteRequest(ROOT_STORAGE_CONTAINER_ID);
+        DeleteRangeResponse response = createDeleteResponse(StatusCode.SUCCESS);
+        DeleteRangeRequest request = createDeleteRequest();
 
         when(mockRangeStoreService.delete(request))
             .thenReturn(CompletableFuture.completedFuture(response));
 
-        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+        CompletableFuture<DeleteRangeResponse> future = fromListenableFuture(
             tableService.delete(request));
         verify(mockRangeStoreService, times(1)).delete(eq(request));
         assertTrue(response == future.get());
@@ -569,13 +558,13 @@ public void testDeleteMockStorageContainer() throws Exception {
 
     @Test
     public void testPutMockStorageContainer() throws Exception {
-        StorageContainerResponse response = createPutResponse(StatusCode.SUCCESS);
-        StorageContainerRequest request = createPutRequest(ROOT_STORAGE_CONTAINER_ID);
+        PutResponse response = createPutResponse(StatusCode.SUCCESS);
+        PutRequest request = createPutRequest();
 
         when(mockRangeStoreService.put(request))
             .thenReturn(CompletableFuture.completedFuture(response));
 
-        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+        CompletableFuture<PutResponse> future = fromListenableFuture(
             tableService.put(request));
         verify(mockRangeStoreService, times(1)).put(eq(request));
         assertTrue(response == future.get());
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
index 178ac19ab..3f87b9e9d 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
@@ -30,8 +30,6 @@
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.junit.Test;
 
@@ -51,23 +49,19 @@ public void testGetActiveRangesSuccess() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        GetActiveRangesRequest request = GetActiveRangesRequest
             .newBuilder()
-            .setGetActiveRangesReq(GetActiveRangesRequest
-                .newBuilder()
-                .setStreamId(23456L)
-                .build())
+            .setStreamId(23456L)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
+        GetActiveRangesResponse response = GetActiveRangesResponse.newBuilder()
             .setCode(StatusCode.SUCCESS)
-            .setGetActiveRangesResp(GetActiveRangesResponse.newBuilder())
             .build();
 
         when(rangeService.getActiveRanges(request)).thenReturn(
             CompletableFuture.completedFuture(response));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<GetActiveRangesResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.getActiveRanges(
             request,
@@ -82,22 +76,19 @@ public void testGetActiveRangesFailure() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        GetActiveRangesRequest request = GetActiveRangesRequest
             .newBuilder()
-            .setGetActiveRangesReq(GetActiveRangesRequest
-                .newBuilder()
-                .setStreamId(23456L)
-                .build())
+            .setStreamId(23456L)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
+        GetActiveRangesResponse response = GetActiveRangesResponse.newBuilder()
             .setCode(StatusCode.INTERNAL_SERVER_ERROR)
             .build();
 
         when(rangeService.getActiveRanges(request)).thenReturn(
             FutureUtils.exception(CAUSE));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<GetActiveRangesResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.getActiveRanges(
             request,
@@ -112,18 +103,15 @@ public void testGetActiveRangesException() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        GetActiveRangesRequest request = GetActiveRangesRequest
             .newBuilder()
-            .setGetActiveRangesReq(GetActiveRangesRequest
-                .newBuilder()
-                .setStreamId(23456L)
-                .build())
+            .setStreamId(23456L)
             .build();
 
         when(rangeService.getActiveRanges(request)).thenReturn(
             FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<GetActiveRangesResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.getActiveRanges(
             request,
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
index e943fb447..874e69ba3 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
@@ -34,10 +34,9 @@
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.junit.Test;
 
@@ -66,24 +65,24 @@ public void testPutSuccess() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        PutRequest request = PutRequest
             .newBuilder()
-            .setKvPutReq(PutRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setValue(TEST_VAL)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setValue(TEST_VAL)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .setKvPutResp(PutResponse.newBuilder())
+        PutResponse response = PutResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.put(request)).thenReturn(
             CompletableFuture.completedFuture(response));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<PutResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.put(
             request,
@@ -98,23 +97,24 @@ public void testPutFailure() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        PutRequest request = PutRequest
             .newBuilder()
-            .setKvPutReq(PutRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setValue(TEST_VAL)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setValue(TEST_VAL)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+        PutResponse response = PutResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.put(request)).thenReturn(
             FutureUtils.exception(CAUSE));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<PutResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.put(
             request,
@@ -129,19 +129,17 @@ public void testPutException() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        PutRequest request = PutRequest
             .newBuilder()
-            .setKvPutReq(PutRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setValue(TEST_VAL)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setValue(TEST_VAL)
+            .setHeader(ROUTING_HEADER)
             .build();
 
         when(rangeService.put(request)).thenReturn(
             FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<PutResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.put(
             request,
@@ -156,23 +154,23 @@ public void testRangeSuccess() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        RangeRequest request = RangeRequest
             .newBuilder()
-            .setKvRangeReq(RangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .setKvRangeResp(RangeResponse.newBuilder())
+        RangeResponse response = RangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.range(request)).thenReturn(
             CompletableFuture.completedFuture(response));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<RangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.range(
             request,
@@ -187,22 +185,23 @@ public void testRangeActiveRangesFailure() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        RangeRequest request = RangeRequest
             .newBuilder()
-            .setKvRangeReq(RangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+        RangeResponse response = RangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.range(request)).thenReturn(
             FutureUtils.exception(CAUSE));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<RangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.range(
             request,
@@ -217,18 +216,16 @@ public void testRangeActiveRangesException() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        RangeRequest request = RangeRequest
             .newBuilder()
-            .setKvRangeReq(RangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
         when(rangeService.range(request)).thenReturn(
             FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<RangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.range(
             request,
@@ -243,23 +240,23 @@ public void testDeleteSuccess() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        DeleteRangeRequest request = DeleteRangeRequest
             .newBuilder()
-            .setKvDeleteReq(DeleteRangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .setKvDeleteResp(DeleteRangeResponse.newBuilder())
+        DeleteRangeResponse response = DeleteRangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.SUCCESS)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.delete(request)).thenReturn(
             CompletableFuture.completedFuture(response));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<DeleteRangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.delete(
             request,
@@ -274,22 +271,23 @@ public void testDeleteFailure() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        DeleteRangeRequest request = DeleteRangeRequest
             .newBuilder()
-            .setKvDeleteReq(DeleteRangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+        DeleteRangeResponse response = DeleteRangeResponse.newBuilder()
+            .setHeader(ResponseHeader.newBuilder()
+                .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                .setRoutingHeader(ROUTING_HEADER)
+                .build())
             .build();
 
         when(rangeService.delete(request)).thenReturn(
             FutureUtils.exception(CAUSE));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<DeleteRangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.delete(
             request,
@@ -304,18 +302,16 @@ public void testDeleteException() throws Exception {
         RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
-        StorageContainerRequest request = StorageContainerRequest
+        DeleteRangeRequest request = DeleteRangeRequest
             .newBuilder()
-            .setKvDeleteReq(DeleteRangeRequest
-                .newBuilder()
-                .setKey(TEST_KEY)
-                .setHeader(ROUTING_HEADER))
+            .setKey(TEST_KEY)
+            .setHeader(ROUTING_HEADER)
             .build();
 
         when(rangeService.delete(request)).thenReturn(
             FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
 
-        TestResponseObserver<StorageContainerResponse> responseObserver =
+        TestResponseObserver<DeleteRangeResponse> responseObserver =
             new TestResponseObserver<>();
         grpcService.delete(
             request,
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
deleted file mode 100644
index 3eb440a1a..000000000
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import io.grpc.Status;
-import io.grpc.StatusException;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.exceptions.StorageException;
-import org.junit.Test;
-import org.mockito.stubbing.Answer;
-
-/**
- * Unit test for {@link StorageContainerResponseHandler}.
- */
-public class TestStorageContainerResponseHandler {
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testSuccessResponse() {
-        StreamObserver<StorageContainerResponse> observer =
-            mock(StreamObserver.class);
-        StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
-        StorageContainerResponse response = StorageContainerResponse.newBuilder()
-            .setCode(StatusCode.SUCCESS)
-            .build();
-        handler.accept(response, null);
-        verify(observer, times(1)).onNext(response);
-        verify(observer, times(1)).onCompleted();
-        verify(observer, times(0)).onError(any());
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testStatusRuntimeException() {
-        StreamObserver<StorageContainerResponse> observer =
-            mock(StreamObserver.class);
-        StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
-        StatusRuntimeException exception = new StatusRuntimeException(Status.NOT_FOUND);
-        handler.accept(null, exception);
-        verify(observer, times(0)).onNext(any());
-        verify(observer, times(0)).onCompleted();
-        verify(observer, times(1)).onError(exception);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testStatusException() {
-        StreamObserver<StorageContainerResponse> observer =
-            mock(StreamObserver.class);
-        StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
-        StatusException exception = new StatusException(Status.NOT_FOUND);
-        handler.accept(null, exception);
-        verify(observer, times(0)).onNext(any());
-        verify(observer, times(0)).onCompleted();
-        verify(observer, times(1)).onError(exception);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testInternalError() throws Exception {
-        StreamObserver<StorageContainerResponse> observer =
-            mock(StreamObserver.class);
-        AtomicReference<StorageContainerResponse> responseHolder =
-            new AtomicReference<>(null);
-        CountDownLatch latch = new CountDownLatch(1);
-        doAnswer((Answer<Void>) invocation -> {
-            StorageContainerResponse resp = invocation.getArgument(0);
-            responseHolder.set(resp);
-            latch.countDown();
-            return null;
-        }).when(observer).onNext(any(StorageContainerResponse.class));
-        StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
-        StorageException exception = new StorageException("test-exception");
-        handler.accept(null, exception);
-        verify(observer, times(1)).onNext(any());
-        verify(observer, times(1)).onCompleted();
-        verify(observer, times(0)).onError(any());
-
-        latch.await();
-        assertNotNull(responseHolder.get());
-        assertEquals(
-            StatusCode.INTERNAL_SERVER_ERROR,
-            responseHolder.get().getCode());
-    }
-
-}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java
index f7b88fc05..2c7124f8e 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java
@@ -46,9 +46,6 @@
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse.ResponseCase;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCAsyncStoreTestBase;
 import org.junit.Test;
 
@@ -90,16 +87,14 @@ private ByteString getValue(int i) {
     }
 
     private List<KeyValue> writeKVs(int numPairs, boolean prevKv) throws Exception {
-        List<CompletableFuture<StorageContainerResponse>> results =
+        List<CompletableFuture<PutResponse>> results =
             Lists.newArrayListWithExpectedSize(numPairs);
         for (int i = 0; i < numPairs; i++) {
             results.add(writeKV(i, prevKv));
         }
         return Lists.transform(
-            result(FutureUtils.collect(results)), response -> {
-                assertEquals(StatusCode.SUCCESS, response.getCode());
-                assertEquals(ResponseCase.KV_PUT_RESP, response.getResponseCase());
-                PutResponse putResp = response.getKvPutResp();
+            result(FutureUtils.collect(results)), putResp -> {
+                assertEquals(StatusCode.SUCCESS, putResp.getHeader().getCode());
                 assertEquals(HEADER, putResp.getHeader().getRoutingHeader());
                 if (putResp.hasPrevKv()) {
                     return putResp.getPrevKv();
@@ -109,33 +104,26 @@ private ByteString getValue(int i) {
             });
     }
 
-    private CompletableFuture<StorageContainerResponse> writeKV(int i, boolean prevKv) {
-        return tableStore.put(StorageContainerRequest.newBuilder()
-            .setScId(SC_ID)
-            .setKvPutReq(PutRequest.newBuilder()
-                .setKey(getKey(i))
-                .setValue(getValue(i))
-                .setHeader(HEADER)
-                .setPrevKv(prevKv))
+    private CompletableFuture<PutResponse> writeKV(int i, boolean prevKv) {
+        return tableStore.put(PutRequest.newBuilder()
+            .setKey(getKey(i))
+            .setValue(getValue(i))
+            .setHeader(HEADER)
+            .setPrevKv(prevKv)
             .build());
     }
 
-    StorageContainerResponse getKeyFromTableStore(int i) throws Exception {
+    RangeResponse getKeyFromTableStore(int i) throws Exception {
         return result(
-            tableStore.range(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvRangeReq(RangeRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(i))
-                    .build())
+            tableStore.range(RangeRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(i))
                 .build()));
     }
 
     KeyValue getKeyValue(int i) throws Exception {
-        StorageContainerResponse resp = getKeyFromTableStore(i);
-        assertEquals(StatusCode.SUCCESS, resp.getCode());
-        assertEquals(ResponseCase.KV_RANGE_RESP, resp.getResponseCase());
-        RangeResponse rr = resp.getKvRangeResp();
+        RangeResponse rr = getKeyFromTableStore(i);
+        assertEquals(StatusCode.SUCCESS, rr.getHeader().getCode());
         assertEquals(HEADER, rr.getHeader().getRoutingHeader());
         assertFalse(rr.getMore());
         if (0 == rr.getCount()) {
@@ -146,53 +134,43 @@ KeyValue getKeyValue(int i) throws Exception {
     }
 
     void putKeyToTableStore(int key, int value) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.put(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvPutReq(PutRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(key))
-                    .setValue(getValue(value))
-                    .build())
+        PutResponse putResp = result(
+            tableStore.put(PutRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(key))
+                .setValue(getValue(value))
                 .build()));
 
-        assertEquals(StatusCode.SUCCESS, response.getCode());
-        assertEquals(ResponseCase.KV_PUT_RESP, response.getResponseCase());
-        PutResponse putResp = response.getKvPutResp();
+        assertEquals(StatusCode.SUCCESS, putResp.getHeader().getCode());
         assertEquals(HEADER, putResp.getHeader().getRoutingHeader());
         assertFalse(putResp.hasPrevKv());
     }
 
     KeyValue putIfAbsentToTableStore(int key, int value, boolean expectedSuccess) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.txn(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvTxnReq(TxnRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .addCompare(Compare.newBuilder()
-                        .setResult(CompareResult.EQUAL)
-                        .setTarget(CompareTarget.VALUE)
+        TxnResponse txnResp = result(
+            tableStore.txn(TxnRequest.newBuilder()
+                .setHeader(HEADER)
+                .addCompare(Compare.newBuilder()
+                    .setResult(CompareResult.EQUAL)
+                    .setTarget(CompareTarget.VALUE)
+                    .setKey(getKey(key))
+                    .setValue(ByteString.copyFrom(new byte[0])))
+                .addSuccess(RequestOp.newBuilder()
+                    .setRequestPut(PutRequest.newBuilder()
+                        .setHeader(HEADER)
                         .setKey(getKey(key))
-                        .setValue(ByteString.copyFrom(new byte[0])))
-                    .addSuccess(RequestOp.newBuilder()
-                        .setRequestPut(PutRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .setValue(getValue(value))
-                            .setPrevKv(true)
-                            .build()))
-                    .addFailure(RequestOp.newBuilder()
-                        .setRequestRange(RangeRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .build()))
-                    .build())
+                        .setValue(getValue(value))
+                        .setPrevKv(true)
+                        .build()))
+                .addFailure(RequestOp.newBuilder()
+                    .setRequestRange(RangeRequest.newBuilder()
+                        .setHeader(HEADER)
+                        .setKey(getKey(key))
+                        .build()))
                 .build()));
 
-        assertEquals(ResponseCase.KV_TXN_RESP, response.getResponseCase());
-        TxnResponse txnResp = response.getKvTxnResp();
         assertEquals(HEADER, txnResp.getHeader().getRoutingHeader());
-        assertEquals(StatusCode.SUCCESS, response.getCode());
+        assertEquals(StatusCode.SUCCESS, txnResp.getHeader().getCode());
 
         ResponseOp respOp = txnResp.getResponses(0);
         if (expectedSuccess) {
@@ -215,39 +193,34 @@ KeyValue putIfAbsentToTableStore(int key, int value, boolean expectedSuccess) th
         }
     }
 
-    StorageContainerResponse vPutToTableStore(int key, int value, long version)
+    TxnResponse vPutToTableStore(int key, int value, long version)
         throws Exception {
         return result(
-            tableStore.txn(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvTxnReq(TxnRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .addCompare(Compare.newBuilder()
-                        .setResult(CompareResult.EQUAL)
-                        .setTarget(CompareTarget.VERSION)
+            tableStore.txn(TxnRequest.newBuilder()
+                .setHeader(HEADER)
+                .addCompare(Compare.newBuilder()
+                    .setResult(CompareResult.EQUAL)
+                    .setTarget(CompareTarget.VERSION)
+                    .setKey(getKey(key))
+                    .setVersion(version))
+                .addSuccess(RequestOp.newBuilder()
+                    .setRequestPut(PutRequest.newBuilder()
+                        .setHeader(HEADER)
+                        .setKey(getKey(key))
+                        .setValue(getValue(value))
+                        .setPrevKv(true)
+                        .build()))
+                .addFailure(RequestOp.newBuilder()
+                    .setRequestRange(RangeRequest.newBuilder()
+                        .setHeader(HEADER)
                         .setKey(getKey(key))
-                        .setVersion(version))
-                    .addSuccess(RequestOp.newBuilder()
-                        .setRequestPut(PutRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .setValue(getValue(value))
-                            .setPrevKv(true)
-                            .build()))
-                    .addFailure(RequestOp.newBuilder()
-                        .setRequestRange(RangeRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .build()))
-                    .build())
+                        .build()))
                 .build()));
     }
 
-    KeyValue verifyVPutResponse(StorageContainerResponse response, boolean expectedSuccess) throws Exception {
-        assertEquals(ResponseCase.KV_TXN_RESP, response.getResponseCase());
-        TxnResponse txnResp = response.getKvTxnResp();
+    KeyValue verifyVPutResponse(TxnResponse txnResp, boolean expectedSuccess) throws Exception {
         assertEquals(HEADER, txnResp.getHeader().getRoutingHeader());
-        assertEquals(StatusCode.SUCCESS, response.getCode());
+        assertEquals(StatusCode.SUCCESS, txnResp.getHeader().getCode());
 
         ResponseOp respOp = txnResp.getResponses(0);
         if (expectedSuccess) {
@@ -270,89 +243,71 @@ KeyValue verifyVPutResponse(StorageContainerResponse response, boolean expectedS
         }
     }
 
-    StorageContainerResponse rPutToTableStore(int key, int value, long revision)
+    TxnResponse rPutToTableStore(int key, int value, long revision)
         throws Exception {
         return result(
-            tableStore.txn(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvTxnReq(TxnRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .addCompare(Compare.newBuilder()
-                        .setResult(CompareResult.EQUAL)
-                        .setTarget(CompareTarget.MOD)
+            tableStore.txn(TxnRequest.newBuilder()
+                .setHeader(HEADER)
+                .addCompare(Compare.newBuilder()
+                    .setResult(CompareResult.EQUAL)
+                    .setTarget(CompareTarget.MOD)
+                    .setKey(getKey(key))
+                    .setModRevision(revision))
+                .addSuccess(RequestOp.newBuilder()
+                    .setRequestPut(PutRequest.newBuilder()
+                        .setHeader(HEADER)
+                        .setKey(getKey(key))
+                        .setValue(getValue(value))
+                        .setPrevKv(true)
+                        .build()))
+                .addFailure(RequestOp.newBuilder()
+                    .setRequestRange(RangeRequest.newBuilder()
+                        .setHeader(HEADER)
                         .setKey(getKey(key))
-                        .setModRevision(revision))
-                    .addSuccess(RequestOp.newBuilder()
-                        .setRequestPut(PutRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .setValue(getValue(value))
-                            .setPrevKv(true)
-                            .build()))
-                    .addFailure(RequestOp.newBuilder()
-                        .setRequestRange(RangeRequest.newBuilder()
-                            .setHeader(HEADER)
-                            .setKey(getKey(key))
-                            .build()))
-                    .build())
+                        .build()))
                 .build()));
     }
 
     KeyValue deleteKeyFromTableStore(int key) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.delete(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvDeleteReq(DeleteRangeRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(key))
-                    .setPrevKv(true)
-                    .build())
+        DeleteRangeResponse response = result(
+            tableStore.delete(DeleteRangeRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(key))
+                .setPrevKv(true)
                 .build()));
 
-        assertEquals(StatusCode.SUCCESS, response.getCode());
-        assertEquals(ResponseCase.KV_DELETE_RESP, response.getResponseCase());
-        DeleteRangeResponse delResp = response.getKvDeleteResp();
-        assertEquals(HEADER, delResp.getHeader().getRoutingHeader());
-        if (0 == delResp.getPrevKvsCount()) {
+        assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
+        assertEquals(HEADER, response.getHeader().getRoutingHeader());
+        if (0 == response.getPrevKvsCount()) {
             return null;
         } else {
-            return delResp.getPrevKvs(0);
+            return response.getPrevKvs(0);
         }
     }
 
     List<KeyValue> deleteRange(int startKey, int endKey) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.delete(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvDeleteReq(DeleteRangeRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(startKey))
-                    .setRangeEnd(getKey(endKey))
-                    .setPrevKv(true)
-                    .build())
+        DeleteRangeResponse delResp = result(
+            tableStore.delete(DeleteRangeRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(startKey))
+                .setRangeEnd(getKey(endKey))
+                .setPrevKv(true)
                 .build()));
 
-        assertEquals(StatusCode.SUCCESS, response.getCode());
-        assertEquals(ResponseCase.KV_DELETE_RESP, response.getResponseCase());
-        DeleteRangeResponse delResp = response.getKvDeleteResp();
+        assertEquals(StatusCode.SUCCESS, delResp.getHeader().getCode());
         assertEquals(HEADER, delResp.getHeader().getRoutingHeader());
         return delResp.getPrevKvsList();
     }
 
     List<KeyValue> range(int startKey, int endKey) throws Exception {
-        StorageContainerResponse response = result(
-            tableStore.range(StorageContainerRequest.newBuilder()
-                .setScId(SC_ID)
-                .setKvRangeReq(RangeRequest.newBuilder()
-                    .setHeader(HEADER)
-                    .setKey(getKey(startKey))
-                    .setRangeEnd(getKey(endKey))
-                    .build())
+        RangeResponse rangeResp = result(
+            tableStore.range(RangeRequest.newBuilder()
+                .setHeader(HEADER)
+                .setKey(getKey(startKey))
+                .setRangeEnd(getKey(endKey))
                 .build()));
 
-        assertEquals(StatusCode.SUCCESS, response.getCode());
-        assertEquals(ResponseCase.KV_RANGE_RESP, response.getResponseCase());
-        RangeResponse rangeResp = response.getKvRangeResp();
+        assertEquals(StatusCode.SUCCESS, rangeResp.getHeader().getCode());
         assertEquals(HEADER, rangeResp.getHeader().getRoutingHeader());
         return rangeResp.getKvsList();
     }
@@ -398,18 +353,18 @@ public void testBasicOps() throws Exception {
             int key = 2;
             int initialVal = 2;
             int casVal = 99;
-            StorageContainerResponse response = vPutToTableStore(key, initialVal, 100L);
-            assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+            TxnResponse response = vPutToTableStore(key, initialVal, 100L);
+            assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
 
             // vPut(k, v, -1L)
             response = vPutToTableStore(key, initialVal, -1L);
-            assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+            assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
             // put(key2, v)
             KeyValue prevKV = putIfAbsentToTableStore(key, initialVal, true);
             assertNull(prevKV);
             // vPut(key2, v, 0)
             response = vPutToTableStore(key, casVal, 0);
-            assertEquals(StatusCode.SUCCESS, response.getCode());
+            assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
             prevKV = verifyVPutResponse(response, true);
             assertNotNull(prevKV);
             assertEquals(getKey(key), prevKV.getKey());
@@ -428,12 +383,12 @@ public void testBasicOps() throws Exception {
             int initialVal = 3;
             int casVal = 99;
 
-            StorageContainerResponse response = rPutToTableStore(key, initialVal, 100L);
-            assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+            TxnResponse response = rPutToTableStore(key, initialVal, 100L);
+            assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
 
             // rPut(k, v, -1L)
             response = rPutToTableStore(key, initialVal, -1L);
-            assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+            assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
 
             // put(key2, v)
             KeyValue prevKV = putIfAbsentToTableStore(key, initialVal, true);
@@ -445,7 +400,7 @@ public void testBasicOps() throws Exception {
 
             // rPut(key2, v, 0)
             response = rPutToTableStore(key, casVal, revision);
-            assertEquals(StatusCode.SUCCESS, response.getCode());
+            assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
 
             kv = getKeyValue(key);
             assertEquals(revision + 1, kv.getModRevision());
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
index d2ed9b60a..d0156c55d 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
@@ -36,8 +36,6 @@
 import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.storage.impl.metadata.stream.MetaRangeImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCAsyncStoreTestBase;
@@ -69,21 +67,18 @@ protected void doSetup() throws Exception {
     protected void doTeardown() throws Exception {
     }
 
-    StorageContainerRequest createRequest(StreamProperties streamProperties) {
+    GetActiveRangesRequest createRequest(StreamProperties streamProperties) {
         GetActiveRangesRequest.Builder reqBuilder = GetActiveRangesRequest.newBuilder()
             .setStreamId(this.streamProps.getStreamId());
         if (null != streamProperties) {
             reqBuilder = reqBuilder.setStreamProps(streamProperties);
         }
-        return StorageContainerRequest.newBuilder()
-            .setGetActiveRangesReq(reqBuilder)
-            .setScId(1L)
-            .build();
+        return reqBuilder.build();
     }
 
     @Test
     public void testCreateIfMissingPropsNotSpecified() throws Exception {
-        StorageContainerResponse resp = FutureUtils.result(
+        GetActiveRangesResponse resp = FutureUtils.result(
             this.mrStoreImpl.getActiveRanges(createRequest(null)));
 
         assertEquals(StatusCode.STREAM_NOT_FOUND, resp.getCode());
@@ -91,13 +86,11 @@ public void testCreateIfMissingPropsNotSpecified() throws Exception {
 
     @Test
     public void testCreateIfMissing() throws Exception {
-        StorageContainerResponse resp = FutureUtils.result(
+        GetActiveRangesResponse resp = FutureUtils.result(
             this.mrStoreImpl.getActiveRanges(createRequest(streamProps)));
 
         assertEquals(StatusCode.SUCCESS, resp.getCode());
-        GetActiveRangesResponse getResp = resp.getGetActiveRangesResp();
-
-        verifyGetResponse(getResp);
+        verifyGetResponse(resp);
     }
 
     private void verifyGetResponse(GetActiveRangesResponse getResp) throws Exception {
@@ -222,22 +215,19 @@ private void verifyRangeMetadata(RangeMetadata metadata,
 
     @Test
     public void testGetTwice() throws Exception {
-
-        StorageContainerResponse resp = FutureUtils.result(
+        GetActiveRangesResponse resp = FutureUtils.result(
             this.mrStoreImpl.getActiveRanges(createRequest(streamProps)));
 
         assertEquals(StatusCode.SUCCESS, resp.getCode());
 
-        GetActiveRangesResponse getResp = resp.getGetActiveRangesResp();
-        verifyGetResponse(getResp);
+        verifyGetResponse(resp);
 
-        StorageContainerResponse secondResp = FutureUtils.result(
+        GetActiveRangesResponse secondResp = FutureUtils.result(
             this.mrStoreImpl.getActiveRanges(createRequest(streamProps)));
 
         assertEquals(StatusCode.SUCCESS, secondResp.getCode());
 
-        GetActiveRangesResponse secondGetResp = resp.getGetActiveRangesResp();
-        verifyGetResponse(secondGetResp);
+        verifyGetResponse(secondResp);
     }
 
 }
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
index a4d8710d6..2cbb27fa4 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
@@ -17,10 +17,6 @@
  */
 package org.apache.bookkeeper.stream.storage.impl.service;
 
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_RANGE_ID;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_STREAM_ID;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_RANGE_ID;
@@ -40,11 +36,15 @@
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -53,13 +53,12 @@
 import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.protocol.RangeId;
 import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
 import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
@@ -322,11 +321,11 @@ public void testGetStream() throws Exception {
     public void testGetActiveRanges() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(mrStore.getActiveRanges(any(StorageContainerRequest.class)))
+        GetActiveRangesResponse expectedResp = GetActiveRangesResponse.getDefaultInstance();
+        when(mrStore.getActiveRanges(any(GetActiveRangesRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest expectedReq = StorageContainerRequest.getDefaultInstance();
+        GetActiveRangesRequest expectedReq = GetActiveRangesRequest.getDefaultInstance();
         assertSame(
             expectedResp,
             FutureUtils.result(mrStore.getActiveRanges(expectedReq)));
@@ -338,50 +337,66 @@ public void testGetActiveRanges() throws Exception {
     // Table API
     //
 
-    private StorageContainerRequest newStorageContainerRequest(RequestCase type) {
-        StorageContainerRequest.Builder reqBuilder = StorageContainerRequest.newBuilder()
-            .setScId(SCID);
+    private PutRequest newPutRequest() {
         RoutingHeader header = RoutingHeader.newBuilder()
             .setStreamId(STREAM_ID)
             .setRangeId(RANGE_ID)
             .build();
-        switch (type) {
-            case KV_PUT_REQ:
-                reqBuilder = reqBuilder.setKvPutReq(
-                    PutRequest.newBuilder().setHeader(header));
-                break;
-            case KV_DELETE_REQ:
-                reqBuilder = reqBuilder.setKvDeleteReq(
-                    DeleteRangeRequest.newBuilder().setHeader(header));
-                break;
-            case KV_RANGE_REQ:
-                reqBuilder = reqBuilder.setKvRangeReq(
-                    RangeRequest.newBuilder().setHeader(header));
-                break;
-            case KV_TXN_REQ:
-                reqBuilder = reqBuilder.setKvTxnReq(
-                    TxnRequest.newBuilder().setHeader(header));
-                break;
-            case KV_INCR_REQ:
-                reqBuilder = reqBuilder.setKvIncrReq(
-                    IncrementRequest.newBuilder().setHeader(header));
-                break;
-            default:
-                break;
-        }
-        return reqBuilder.build();
+        return PutRequest.newBuilder()
+            .setHeader(header)
+            .build();
+    }
+
+    private DeleteRangeRequest newDeleteRequest() {
+        RoutingHeader header = RoutingHeader.newBuilder()
+            .setStreamId(STREAM_ID)
+            .setRangeId(RANGE_ID)
+            .build();
+        return DeleteRangeRequest.newBuilder()
+            .setHeader(header)
+            .build();
+    }
+
+    private RangeRequest newRangeRequest() {
+        RoutingHeader header = RoutingHeader.newBuilder()
+            .setStreamId(STREAM_ID)
+            .setRangeId(RANGE_ID)
+            .build();
+        return RangeRequest.newBuilder()
+            .setHeader(header)
+            .build();
+    }
+
+    private IncrementRequest newIncrRequest() {
+        RoutingHeader header = RoutingHeader.newBuilder()
+            .setStreamId(STREAM_ID)
+            .setRangeId(RANGE_ID)
+            .build();
+        return IncrementRequest.newBuilder()
+            .setHeader(header)
+            .build();
+    }
+
+    private TxnRequest newTxnRequest() {
+        RoutingHeader header = RoutingHeader.newBuilder()
+            .setStreamId(STREAM_ID)
+            .setRangeId(RANGE_ID)
+            .build();
+        return TxnRequest.newBuilder()
+            .setHeader(header)
+            .build();
     }
 
     @Test
     public void testRangeWhenTableStoreNotCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.range(any(StorageContainerRequest.class)))
+        RangeResponse expectedResp = RangeResponse.getDefaultInstance();
+        when(trStore.range(any(RangeRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_RANGE_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.range(request));
+        RangeRequest request = newRangeRequest();
+        RangeResponse response = FutureUtils.result(container.range(request));
         assertSame(expectedResp, response);
         assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
     }
@@ -390,13 +405,13 @@ public void testRangeWhenTableStoreNotCached() throws Exception {
     public void testRangeWhenTableStoreCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.range(any(StorageContainerRequest.class)))
+        RangeResponse expectedResp = RangeResponse.getDefaultInstance();
+        when(trStore.range(any(RangeRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
         container.getTableStoreCache().getTableStores().put(RID, trStore);
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_RANGE_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.range(request));
+        RangeRequest request = newRangeRequest();
+        RangeResponse response = FutureUtils.result(container.range(request));
         assertSame(expectedResp, response);
     }
 
@@ -404,12 +419,12 @@ public void testRangeWhenTableStoreCached() throws Exception {
     public void testPutWhenTableStoreNotCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.put(any(StorageContainerRequest.class)))
+        PutResponse expectedResp = PutResponse.getDefaultInstance();
+        when(trStore.put(any(PutRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_PUT_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.put(request));
+        PutRequest request = newPutRequest();
+        PutResponse response = FutureUtils.result(container.put(request));
         assertSame(expectedResp, response);
         assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
     }
@@ -418,13 +433,13 @@ public void testPutWhenTableStoreNotCached() throws Exception {
     public void testPutWhenTableStoreCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.put(any(StorageContainerRequest.class)))
+        PutResponse expectedResp = PutResponse.getDefaultInstance();
+        when(trStore.put(any(PutRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
         container.getTableStoreCache().getTableStores().put(RID, trStore);
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_PUT_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.put(request));
+        PutRequest request = newPutRequest();
+        PutResponse response = FutureUtils.result(container.put(request));
         assertSame(expectedResp, response);
     }
 
@@ -432,12 +447,12 @@ public void testPutWhenTableStoreCached() throws Exception {
     public void testDeleteWhenTableStoreNotCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.delete(any(StorageContainerRequest.class)))
+        DeleteRangeResponse expectedResp = DeleteRangeResponse.getDefaultInstance();
+        when(trStore.delete(any(DeleteRangeRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_DELETE_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.delete(request));
+        DeleteRangeRequest request = newDeleteRequest();
+        DeleteRangeResponse response = FutureUtils.result(container.delete(request));
         assertSame(expectedResp, response);
         assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
     }
@@ -446,13 +461,13 @@ public void testDeleteWhenTableStoreNotCached() throws Exception {
     public void testDeleteWhenTableStoreCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.delete(any(StorageContainerRequest.class)))
+        DeleteRangeResponse expectedResp = DeleteRangeResponse.getDefaultInstance();
+        when(trStore.delete(any(DeleteRangeRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
         container.getTableStoreCache().getTableStores().put(RID, trStore);
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_DELETE_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.delete(request));
+        DeleteRangeRequest request = newDeleteRequest();
+        DeleteRangeResponse response = FutureUtils.result(container.delete(request));
         assertSame(expectedResp, response);
     }
 
@@ -460,12 +475,12 @@ public void testDeleteWhenTableStoreCached() throws Exception {
     public void testTxnWhenTableStoreNotCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.txn(any(StorageContainerRequest.class)))
+        TxnResponse expectedResp = TxnResponse.getDefaultInstance();
+        when(trStore.txn(any(TxnRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_TXN_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.txn(request));
+        TxnRequest request = newTxnRequest();
+        TxnResponse response = FutureUtils.result(container.txn(request));
         assertSame(expectedResp, response);
         assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
     }
@@ -474,13 +489,13 @@ public void testTxnWhenTableStoreNotCached() throws Exception {
     public void testTxnWhenTableStoreCached() throws Exception {
         mockStorageContainer(SCID);
 
-        StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
-        when(trStore.txn(any(StorageContainerRequest.class)))
+        TxnResponse expectedResp = TxnResponse.getDefaultInstance();
+        when(trStore.txn(any(TxnRequest.class)))
             .thenReturn(FutureUtils.value(expectedResp));
         container.getTableStoreCache().getTableStores().put(RID, trStore);
 
-        StorageContainerRequest request = newStorageContainerRequest(KV_TXN_REQ);
-        StorageContainerResponse response = FutureUtils.result(container.txn(request));
+        TxnRequest request = newTxnRequest();
+        TxnResponse response = FutureUtils.result(container.txn(request));
         assertSame(expectedResp, response);
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services