You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 22:03:59 UTC

[bookkeeper] 05/09: [TABLE SERVICE] use grpc reverse proxy to serve storage container requests

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 8e4c13b5e12d94a8a6ec921d6dc28a2b0ca4a0ca
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon May 28 18:29:09 2018 -0700

    [TABLE SERVICE] use grpc reverse proxy to serve storage container requests
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    *Changes*
    
    **Client Changes**
    
    Changed `StorageContainerChannel` to add client interceptor to stamp storage container id into the the requests' metadata.
    
    **Server Changes**
    
    - moved the stream storage logic out of storage containers to a `service` package. the main logic will be kept in `RangeStoreService` and `RangeStoreServiceFactory`.
    - make the storage container logic for hosting `StorageContainerService`.
    - Each storage container will run an inprogress grpc server for serving the grpc services registered to the container and provide an `channel` for accessing those grpc service.
    - Changed the server to use reverse proxy for serving/proxying storage container requests.
    
    *NOTE*
    
    This change doesn't directly remove `StorageContainerRequest` and `StorageContainerResponse`.  It would be done in a subsequent change.
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1448 from sijie/storage_container_channel
---
 .../StorageContainerClientInterceptor.java         |   4 +-
 .../container/TestStorageContainerChannel.java     |  13 +-
 .../bookkeeper/common/util/ListenableFutures.java  |  10 +
 .../stream/protocol/ProtocolConstants.java         |   3 +
 .../bookkeeper/stream/server/StorageServer.java    |   6 +-
 .../bookkeeper/stream/server/grpc/GrpcServer.java  |  28 +-
 .../stream/server/grpc/GrpcServerSpec.java         |   4 +-
 .../server/grpc/GrpcStorageContainerService.java   |  10 +-
 .../stream/server/service/StorageService.java      |  16 +-
 .../stream/server/grpc/TestGrpcServer.java         |   6 +-
 ...{RangeStore.java => StorageContainerStore.java} |  15 +-
 .../storage/api/metadata/RangeStoreService.java    |   6 +
 .../stream/storage/api/sc/StorageContainer.java    |  12 +-
 ...Container.java => StorageContainerService.java} |  36 ++-
 .../api/sc/StorageContainerServiceFactory.java     |  39 +++
 .../api/service/RangeStoreServiceFactory.java      |  41 +++
 .../stream/storage/api/service/package-info.java   |  22 ++
 stream/storage/impl/pom.xml                        |   5 +
 ...lder.java => StorageContainerStoreBuilder.java} |  56 ++--
 .../stream/storage/impl/RangeStoreImpl.java        | 206 -------------
 .../storage/impl/StorageContainerStoreImpl.java    | 113 +++++++
 .../storage/impl/grpc/GrpcMetaRangeService.java    |   2 +-
 .../stream/storage/impl/grpc/GrpcServices.java     |  46 +++
 .../stream/storage/impl/sc/Channel404.java         |  80 +++++
 .../impl/sc/DefaultStorageContainerFactory.java    |  31 +-
 .../storage/impl/sc/StorageContainer404.java       |  63 ++++
 .../storage/impl/sc/StorageContainerImpl.java      | 336 ++++-----------------
 .../impl/sc/StorageContainerRegistryImpl.java      |   8 +-
 .../FailRequestRangeStoreService.java}             |  43 +--
 .../RangeStoreContainerServiceFactoryImpl.java     |  46 +++
 .../service/RangeStoreContainerServiceImpl.java    |  55 ++++
 .../impl/service/RangeStoreServiceFactoryImpl.java |  71 +++++
 .../RangeStoreServiceImpl.java}                    |  60 ++--
 .../stream/storage/impl/service/package-info.java  |  22 ++
 ....java => TestStorageContainerStoreBuilder.java} |  22 +-
 ...mpl.java => TestStorageContainerStoreImpl.java} | 244 ++++++++-------
 .../impl/grpc/TestGrpcMetaRangeService.java        |   8 +-
 .../impl/grpc/TestGrpcRootRangeService.java        |  26 +-
 .../storage/impl/grpc/TestGrpcTableService.java    |  20 +-
 .../sc/TestDefaultStorageContainerFactory.java     |  35 +--
 .../impl/sc/TestStorageContainerRegistryImpl.java  |  23 +-
 .../impl/sc/ZkStorageContainerManagerTest.java     |  15 +-
 .../RangeStoreServiceImplTest.java}                |  12 +-
 43 files changed, 1034 insertions(+), 885 deletions(-)

diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
index 284431e..1e1de08 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
@@ -18,6 +18,8 @@
 
 package org.apache.bookkeeper.clients.impl.container;
 
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY;
+
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -32,8 +34,6 @@ import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
  */
 public class StorageContainerClientInterceptor implements ClientInterceptor {
 
-    private static final String SC_ID_KEY = "SC_ID";
-
     private final long scId;
     private final Metadata.Key<Long> scIdKey;
 
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/container/TestStorageContainerChannel.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/container/TestStorageContainerChannel.java
index 9d3869b..5623335 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/container/TestStorageContainerChannel.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/container/TestStorageContainerChannel.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -58,9 +59,9 @@ public class TestStorageContainerChannel extends GrpcClientTestBase {
     private OrderedScheduler scheduler;
     private final LocationClient locationClient = mock(LocationClient.class);
 
-    private StorageServerChannel mockChannel = mock(StorageServerChannel.class);
-    private StorageServerChannel mockChannel2 = mock(StorageServerChannel.class);
-    private StorageServerChannel mockChannel3 = mock(StorageServerChannel.class);
+    private StorageServerChannel mockChannel = newMockServerChannel();
+    private StorageServerChannel mockChannel2 = newMockServerChannel();
+    private StorageServerChannel mockChannel3 = newMockServerChannel();
     private final Endpoint endpoint = Endpoint.newBuilder()
         .setHostname("127.0.0.1")
         .setPort(8181)
@@ -106,6 +107,12 @@ public class TestStorageContainerChannel extends GrpcClientTestBase {
         }
     }
 
+    private StorageServerChannel newMockServerChannel() {
+        StorageServerChannel channel = mock(StorageServerChannel.class);
+        when(channel.intercept(anyLong())).thenReturn(channel);
+        return channel;
+    }
+
     private void ensureCallbackExecuted() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
         scheduler.submit(() -> latch.countDown());
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/util/ListenableFutures.java b/stream/common/src/main/java/org/apache/bookkeeper/common/util/ListenableFutures.java
index 586a457..29b5fa6 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/util/ListenableFutures.java
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/util/ListenableFutures.java
@@ -36,6 +36,16 @@ import lombok.NoArgsConstructor;
 public final class ListenableFutures {
 
     /**
+     * Convert a {@link ListenableFuture} to a {@link CompletableFuture}.
+     *
+     * @param listenableFuture listenable future to convert.
+     * @return the completable future.
+     */
+    public static <T> CompletableFuture<T> fromListenableFuture(ListenableFuture<T> listenableFuture) {
+        return fromListenableFuture(listenableFuture, Function.identity());
+    }
+
+    /**
      * Convert a {@link ListenableFuture} to a {@link CompletableFuture} and do a transformation.
      *
      * @param listenableFuture listenable future
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 404ec56..0e6900c 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -18,6 +18,7 @@
 
 package org.apache.bookkeeper.stream.protocol;
 
+import io.grpc.Metadata;
 import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy;
 import org.apache.bookkeeper.stream.proto.RangeKeyType;
 import org.apache.bookkeeper.stream.proto.RetentionPolicy;
@@ -107,4 +108,6 @@ public final class ProtocolConstants {
             .setSplitPolicy(DEFAULT_SPLIT_POLICY)
             .build();
 
+    // storage container request metadata key
+    public static final String SC_ID_KEY = "sc-id" + Metadata.BINARY_HEADER_SUFFIX;
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 94c8587..e0774de 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -48,7 +48,7 @@ import org.apache.bookkeeper.stream.server.service.RegistrationServiceProvider;
 import org.apache.bookkeeper.stream.server.service.RegistrationStateService;
 import org.apache.bookkeeper.stream.server.service.StatsProviderService;
 import org.apache.bookkeeper.stream.server.service.StorageService;
-import org.apache.bookkeeper.stream.storage.RangeStoreBuilder;
+import org.apache.bookkeeper.stream.storage.StorageContainerStoreBuilder;
 import org.apache.bookkeeper.stream.storage.StorageResources;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
@@ -243,7 +243,7 @@ public class StorageServer {
             rootStatsLogger.scope("dlog"));
 
         // Create range (stream) store
-        RangeStoreBuilder rangeStoreBuilder = RangeStoreBuilder.newBuilder()
+        StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder()
             .withStatsLogger(rootStatsLogger.scope("storage"))
             .withStorageConfiguration(storageConf)
             // the storage resources shared across multiple components
@@ -281,7 +281,7 @@ public class StorageServer {
                     storageResources,
                     storageConf.getServeReadOnlyTables()));
         StorageService storageService = new StorageService(
-            storageConf, rangeStoreBuilder, rootStatsLogger.scope("storage"));
+            storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage"));
 
         // Create gRPC server
         StatsLogger rpcStatsLogger = rootStatsLogger.scope("grpc");
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
index ce95ccf..7a5f163 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
@@ -18,18 +18,18 @@ import com.google.common.annotations.VisibleForTesting;
 import io.grpc.HandlerRegistry;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
+import io.grpc.ServerServiceDefinition;
 import io.grpc.inprocess.InProcessServerBuilder;
 import java.io.IOException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.grpc.proxy.ProxyHandlerRegistry;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
 import org.apache.bookkeeper.stream.server.exceptions.StorageServerRuntimeException;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
-import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcMetaRangeService;
-import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcRootRangeService;
-import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcTableService;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
+import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcServices;
 
 /**
  * KeyRange Server.
@@ -50,15 +50,15 @@ public class GrpcServer extends AbstractLifecycleComponent<StorageServerConfigur
     private final Endpoint myEndpoint;
     private final Server grpcServer;
 
-    public GrpcServer(RangeStore rangeStore,
+    public GrpcServer(StorageContainerStore storageContainerStore,
                       StorageServerConfiguration conf,
                       Endpoint myEndpoint,
                       StatsLogger statsLogger) {
-        this(rangeStore, conf, myEndpoint, null, null, statsLogger);
+        this(storageContainerStore, conf, myEndpoint, null, null, statsLogger);
     }
 
     @VisibleForTesting
-    public GrpcServer(RangeStore rangeStore,
+    public GrpcServer(StorageContainerStore storageContainerStore,
                       StorageServerConfiguration conf,
                       Endpoint myEndpoint,
                       String localServerName,
@@ -75,12 +75,14 @@ public class GrpcServer extends AbstractLifecycleComponent<StorageServerConfigur
             }
             this.grpcServer = serverBuilder.build();
         } else {
-            this.grpcServer = ServerBuilder
-                .forPort(this.myEndpoint.getPort())
-                .addService(new GrpcRootRangeService(rangeStore))
-                .addService(new GrpcStorageContainerService(rangeStore))
-                .addService(new GrpcMetaRangeService(rangeStore))
-                .addService(new GrpcTableService(rangeStore))
+            ProxyHandlerRegistry.Builder proxyRegistryBuilder = ProxyHandlerRegistry.newBuilder()
+                .setChannelFinder(storageContainerStore);
+            for (ServerServiceDefinition definition : GrpcServices.create(null)) {
+                proxyRegistryBuilder = proxyRegistryBuilder.addService(definition);
+            }
+            this.grpcServer = ServerBuilder.forPort(this.myEndpoint.getPort())
+                .addService(new GrpcStorageContainerService(storageContainerStore))
+                .fallbackHandlerRegistry(proxyRegistryBuilder.build())
                 .build();
         }
     }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServerSpec.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServerSpec.java
index 5b77b02..ac177e5 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServerSpec.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServerSpec.java
@@ -23,7 +23,7 @@ import lombok.experimental.Accessors;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
 
 /**
  * Spec for building a grpc server.
@@ -39,7 +39,7 @@ public class GrpcServerSpec {
      *
      * @return store supplier for building grpc server.
      */
-    Supplier<RangeStore> storeSupplier;
+    Supplier<StorageContainerStore> storeSupplier;
 
     /**
      * Get the storage server configuration.
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcStorageContainerService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcStorageContainerService.java
index e750bfc..8b6a26a 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcStorageContainerService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcStorageContainerService.java
@@ -23,7 +23,7 @@ import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointRes
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceImplBase;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
 
 /**
  * Grpc based storage container service.
@@ -31,10 +31,10 @@ import org.apache.bookkeeper.stream.storage.api.RangeStore;
 @Slf4j
 class GrpcStorageContainerService extends StorageContainerServiceImplBase {
 
-    private final RangeStore rangeStore;
+    private final StorageContainerStore storageContainerStore;
 
-    GrpcStorageContainerService(RangeStore rangeStore) {
-        this.rangeStore = rangeStore;
+    GrpcStorageContainerService(StorageContainerStore storageContainerStore) {
+        this.storageContainerStore = storageContainerStore;
     }
 
     @Override
@@ -43,7 +43,7 @@ class GrpcStorageContainerService extends StorageContainerServiceImplBase {
         GetStorageContainerEndpointResponse.Builder responseBuilder = GetStorageContainerEndpointResponse.newBuilder()
             .setStatusCode(StatusCode.SUCCESS);
         for (int i = 0; i < request.getRequestsCount(); i++) {
-            Endpoint endpoint = rangeStore
+            Endpoint endpoint = storageContainerStore
                 .getRoutingService()
                 .getStorageContainer(request.getRequests(i).getStorageContainer());
             OneStorageContainerEndpointResponse.Builder oneRespBuilder;
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/StorageService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/StorageService.java
index bddd6c1..2ac1804 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/StorageService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/StorageService.java
@@ -19,23 +19,23 @@ import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stream.storage.RangeStoreBuilder;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.StorageContainerStoreBuilder;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 
 /**
- * Service to run the storage {@link RangeStore}.
+ * Service to run the storage {@link StorageContainerStore}.
  */
 @Slf4j
 public class StorageService
     extends AbstractLifecycleComponent<StorageConfiguration>
-    implements Supplier<RangeStore> {
+    implements Supplier<StorageContainerStore> {
 
-    private final RangeStoreBuilder storeBuilder;
-    private RangeStore store;
+    private final StorageContainerStoreBuilder storeBuilder;
+    private StorageContainerStore store;
 
     public StorageService(StorageConfiguration conf,
-                          RangeStoreBuilder storeBuilder,
+                          StorageContainerStoreBuilder storeBuilder,
                           StatsLogger statsLogger) {
         super("storage-service", conf, statsLogger);
         this.storeBuilder = storeBuilder;
@@ -58,7 +58,7 @@ public class StorageService
     }
 
     @Override
-    public RangeStore get() {
+    public StorageContainerStore get() {
         return store;
     }
 }
diff --git a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcServer.java b/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcServer.java
index b05640a..4cfbea1 100644
--- a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcServer.java
+++ b/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcServer.java
@@ -22,7 +22,7 @@ import io.grpc.util.MutableHandlerRegistry;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.server.StorageServer;
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.impl.StorageContainerStoreImpl;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.junit.Rule;
 import org.junit.Test;
@@ -41,7 +41,7 @@ public class TestGrpcServer {
     @Test
     public void testCreateLocalServer() {
         GrpcServer server = new GrpcServer(
-            mock(RangeStoreImpl.class),
+            mock(StorageContainerStoreImpl.class),
             StorageServerConfiguration.of(compConf),
             null,
             name.getMethodName(),
@@ -55,7 +55,7 @@ public class TestGrpcServer {
     @Test
     public void testCreateBindServer() throws Exception {
         GrpcServer server = new GrpcServer(
-            mock(RangeStoreImpl.class),
+            mock(StorageContainerStoreImpl.class),
             StorageServerConfiguration.of(compConf),
             StorageServer.createLocalEndpoint(0, false),
             null,
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/RangeStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/StorageContainerStore.java
similarity index 68%
rename from stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/RangeStore.java
rename to stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/StorageContainerStore.java
index 164275b..61ff588 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/RangeStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/StorageContainerStore.java
@@ -14,15 +14,15 @@
 
 package org.apache.bookkeeper.stream.storage.api;
 
-import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
-import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.common.grpc.proxy.ChannelFinder;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
 
 /**
- * The umbrella interface for accessing ranges (both metadata and data).
+ * The umbrella interface for accessing storage containers.
  */
-public interface RangeStore extends LifecycleComponent, RangeStoreService {
+public interface StorageContainerStore extends LifecycleComponent, ChannelFinder {
 
     /**
      * Get the routing service.
@@ -32,11 +32,10 @@ public interface RangeStore extends LifecycleComponent, RangeStoreService {
     StorageContainerRoutingService getRoutingService();
 
     /**
-     * Choose the executor for a given {@code key}.
+     * Get the container registry.
      *
-     * @param key submit key
-     * @return executor
+     * @return container registry.
      */
-    ScheduledExecutorService chooseExecutor(long key);
+    StorageContainerRegistry getRegistry();
 
 }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/RangeStoreService.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/RangeStoreService.java
index 94e63d3..2aeb317 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/RangeStoreService.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/RangeStoreService.java
@@ -13,6 +13,7 @@
  */
 package org.apache.bookkeeper.stream.storage.api.metadata;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
 
 /**
@@ -22,4 +23,9 @@ public interface RangeStoreService
     extends MetaRangeStore,
     RootRangeStore,
     TableStore {
+
+    CompletableFuture<Void> start();
+
+    CompletableFuture<Void> stop();
+
 }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
index 47855b0..83c5dcd 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
@@ -18,16 +18,15 @@
 
 package org.apache.bookkeeper.stream.storage.api.sc;
 
+import io.grpc.Channel;
 import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 
 /**
  * A {@code StorageContainer} is a service container that can encapsulate metadata and data operations.
  *
  * <p>A {@link StorageContainer} is typically implemented by replicated state machine backed by a log.
  */
-public interface StorageContainer
-    extends AutoCloseable, RangeStoreService {
+public interface StorageContainer extends AutoCloseable {
 
     /**
      * Get the storage container id.
@@ -37,6 +36,13 @@ public interface StorageContainer
     long getId();
 
     /**
+     * Get the grpc channel to interact with grpc services registered in this container.
+     *
+     * @return grpc channel.
+     */
+    Channel getChannel();
+
+    /**
      * Start the storage container.
      *
      * @return a future represents the result of starting a storage container.
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerService.java
similarity index 57%
copy from stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
copy to stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerService.java
index 47855b0..7014a73 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerService.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,41 +18,39 @@
 
 package org.apache.bookkeeper.stream.storage.api.sc;
 
+import io.grpc.ServerServiceDefinition;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 
 /**
- * A {@code StorageContainer} is a service container that can encapsulate metadata and data operations.
+ * Represents the service running within a container.
  *
- * <p>A {@link StorageContainer} is typically implemented by replicated state machine backed by a log.
+ * <p>The implementation implements their core business logic and the framework provides resources like tables/streams
+ * for it.
  */
-public interface StorageContainer
-    extends AutoCloseable, RangeStoreService {
+public interface StorageContainerService {
 
     /**
-     * Get the storage container id.
+     * Return the registered grpc services.
      *
-     * @return the storage container id.
+     * <p>The framework registers the grpc services in the container runtime. So the framework will know how to route
+     * the grpc requests to the actual grpc services.
+     * @return
      */
-    long getId();
+    Collection<ServerServiceDefinition> getRegisteredServices();
 
     /**
-     * Start the storage container.
+     * Start the storage container service.
      *
-     * @return a future represents the result of starting a storage container.
+     * @return a future represents the result of starting a storage container service.
      */
-    CompletableFuture<StorageContainer> start();
+    CompletableFuture<Void> start();
 
     /**
-     * Stop the storage container.
+     * Stop the storage container service.
      *
-     * @return a future represents the result of stopping a storage container.
+     * @return a future represents the result of stopping a storage container service.
      */
     CompletableFuture<Void> stop();
 
-    /**
-     * Close a storage container.
-     */
-    void close();
-
 }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerServiceFactory.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerServiceFactory.java
new file mode 100644
index 0000000..e15f7d7
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerServiceFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.api.sc;
+
+/**
+ * Factory to create storage container service.
+ */
+public interface StorageContainerServiceFactory extends AutoCloseable {
+
+    /**
+     * Create a storage container service with container id <tt>scId</tt>.
+     *
+     * @param scId storage container id
+     * @return storage container id
+     */
+    StorageContainerService createStorageContainerService(long scId);
+
+    /**
+     * {@inheritDoc}
+     */
+    void close();
+
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/RangeStoreServiceFactory.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/RangeStoreServiceFactory.java
new file mode 100644
index 0000000..075fd3e
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/RangeStoreServiceFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.api.service;
+
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+
+/**
+ * Factory to create range store services.
+ */
+public interface RangeStoreServiceFactory extends AutoCloseable {
+
+    /**
+     * Create a range store service that will be launched at storage container <tt>scId</tt>.
+     *
+     * @param scId storage container to run range store service.
+     * @return range store service
+     */
+    RangeStoreService createService(long scId);
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    void close();
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/package-info.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/package-info.java
new file mode 100644
index 0000000..bb88ca2
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes that provide stream storage service logic.
+ */
+package org.apache.bookkeeper.stream.storage.api.service;
\ No newline at end of file
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index b178fc7..4c656f4 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -60,6 +60,11 @@
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.tests</groupId>
+      <artifactId>stream-storage-tests-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
similarity index 72%
rename from stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index c03b374..9c96139 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -20,20 +20,22 @@ import java.net.URI;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.impl.StorageContainerStoreImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
+import org.apache.bookkeeper.stream.storage.impl.service.RangeStoreContainerServiceFactoryImpl;
+import org.apache.bookkeeper.stream.storage.impl.service.RangeStoreServiceFactoryImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
 /**
  * Builder to build the storage component.
  */
-public final class RangeStoreBuilder {
+public final class StorageContainerStoreBuilder {
 
-    public static RangeStoreBuilder newBuilder() {
-        return new RangeStoreBuilder();
+    public static StorageContainerStoreBuilder newBuilder() {
+        return new StorageContainerStoreBuilder();
     }
 
     private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
@@ -45,18 +47,7 @@ public final class RangeStoreBuilder {
     private MVCCStoreFactory mvccStoreFactory = null;
     private URI defaultBackendUri = null;
 
-    private RangeStoreBuilder() {
-    }
-
-    /**
-     * Build the range store with the provided {@code numStorageContainers}.
-     *
-     * @param numStorageContainers number of the storage containers.
-     * @return range store builder
-     */
-    public RangeStoreBuilder withNumStorageContainers(int numStorageContainers) {
-        this.placementPolicyFactory = () -> StorageContainerPlacementPolicyImpl.of(numStorageContainers);
-        return this;
+    private StorageContainerStoreBuilder() {
     }
 
     /**
@@ -65,7 +56,7 @@ public final class RangeStoreBuilder {
      * @param placementPolicyFactory placement policy factor to create placement policies.
      * @return range store builder.
      */
-    public RangeStoreBuilder withStorageContainerPlacementPolicyFactory(
+    public StorageContainerStoreBuilder withStorageContainerPlacementPolicyFactory(
         StorageContainerPlacementPolicy.Factory placementPolicyFactory) {
         this.placementPolicyFactory = placementPolicyFactory;
         return this;
@@ -77,7 +68,7 @@ public final class RangeStoreBuilder {
      * @param statsLogger stats logger for collecting stats.
      * @return range store builder;
      */
-    public RangeStoreBuilder withStatsLogger(StatsLogger statsLogger) {
+    public StorageContainerStoreBuilder withStatsLogger(StatsLogger statsLogger) {
         if (null == statsLogger) {
             return this;
         }
@@ -91,7 +82,7 @@ public final class RangeStoreBuilder {
      * @param storeConf storage configuration
      * @return range store builder
      */
-    public RangeStoreBuilder withStorageConfiguration(StorageConfiguration storeConf) {
+    public StorageContainerStoreBuilder withStorageConfiguration(StorageConfiguration storeConf) {
         this.storeConf = storeConf;
         return this;
     }
@@ -102,7 +93,7 @@ public final class RangeStoreBuilder {
      * @param scmFactory storage container manager factory.
      * @return range store builder
      */
-    public RangeStoreBuilder withStorageContainerManagerFactory(StorageContainerManagerFactory scmFactory) {
+    public StorageContainerStoreBuilder withStorageContainerManagerFactory(StorageContainerManagerFactory scmFactory) {
         this.scmFactory = scmFactory;
         return this;
     }
@@ -113,7 +104,7 @@ public final class RangeStoreBuilder {
      * @param resources storage resources.
      * @return range store builder.
      */
-    public RangeStoreBuilder withStorageResources(StorageResources resources) {
+    public StorageContainerStoreBuilder withStorageResources(StorageResources resources) {
         this.storeResources = resources;
         return this;
     }
@@ -124,7 +115,7 @@ public final class RangeStoreBuilder {
      * @param storeFactory factory to create range stores.
      * @return range store builder.
      */
-    public RangeStoreBuilder withRangeStoreFactory(MVCCStoreFactory storeFactory) {
+    public StorageContainerStoreBuilder withRangeStoreFactory(MVCCStoreFactory storeFactory) {
         this.mvccStoreFactory = storeFactory;
         return this;
     }
@@ -135,25 +126,32 @@ public final class RangeStoreBuilder {
      * @param uri uri for storing table ranges.
      * @return range store builder.
      */
-    public RangeStoreBuilder withDefaultBackendUri(URI uri) {
+    public StorageContainerStoreBuilder withDefaultBackendUri(URI uri) {
         this.defaultBackendUri = uri;
         return this;
     }
 
-    public RangeStore build() {
+    public StorageContainerStore build() {
         checkNotNull(scmFactory, "StorageContainerManagerFactory is not provided");
         checkNotNull(storeConf, "StorageConfiguration is not provided");
         checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
         checkNotNull(defaultBackendUri, "Default backend uri is not provided");
         checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided");
 
-        return new RangeStoreImpl(
+        RangeStoreServiceFactoryImpl serviceFactory = new RangeStoreServiceFactoryImpl(
             storeConf,
+            placementPolicyFactory.newPlacementPolicy(),
             storeResources.scheduler(),
-            scmFactory,
             mvccStoreFactory,
-            defaultBackendUri,
-            placementPolicyFactory,
+            defaultBackendUri);
+
+        RangeStoreContainerServiceFactoryImpl containerServiceFactory =
+            new RangeStoreContainerServiceFactoryImpl(serviceFactory);
+
+        return new StorageContainerStoreImpl(
+            storeConf,
+            scmFactory,
+            containerServiceFactory,
             statsLogger);
     }
 
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
deleted file mode 100644
index f175a55..0000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl;
-
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.common.util.SharedResourceManager;
-import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
-import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
-import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
-
-/**
- * KeyRange Service.
- */
-public class RangeStoreImpl
-    extends AbstractLifecycleComponent<StorageConfiguration>
-    implements RangeStore {
-
-    private final Resource<OrderedScheduler> schedulerResource;
-    private final OrderedScheduler scheduler;
-    private final StorageContainerManagerFactory scmFactory;
-    private final StorageContainerRegistryImpl scRegistry;
-    private final StorageContainerManager scManager;
-    private final MVCCStoreFactory storeFactory;
-
-    public RangeStoreImpl(StorageConfiguration conf,
-                          Resource<OrderedScheduler> schedulerResource,
-                          StorageContainerManagerFactory factory,
-                          MVCCStoreFactory mvccStoreFactory,
-                          URI defaultBackendUri,
-                          StorageContainerPlacementPolicy.Factory placementPolicyFactory,
-                          StatsLogger statsLogger) {
-        super("range-service", conf, statsLogger);
-        this.schedulerResource = schedulerResource;
-        this.scheduler = SharedResourceManager.shared().get(schedulerResource);
-        this.scmFactory = factory;
-        this.storeFactory = mvccStoreFactory;
-        StorageContainerPlacementPolicy placementPolicy = placementPolicyFactory.newPlacementPolicy();
-        this.scRegistry = new StorageContainerRegistryImpl(
-            new DefaultStorageContainerFactory(
-                conf,
-                placementPolicy,
-                scheduler,
-                storeFactory,
-                defaultBackendUri),
-            scheduler);
-        this.scManager = scmFactory.create(conf, scRegistry);
-    }
-
-    @Override
-    public ScheduledExecutorService chooseExecutor(long key) {
-        return this.scheduler.chooseThread(key);
-    }
-
-    @VisibleForTesting
-    StorageContainerRegistryImpl getRegistry() {
-        return this.scRegistry;
-    }
-
-    @Override
-    public StorageContainerRoutingService getRoutingService() {
-        return this.scManager;
-    }
-
-    //
-    // Lifecycle management
-    //
-
-    @Override
-    protected void doStart() {
-        this.scManager.start();
-    }
-
-    @Override
-    protected void doStop() {
-        this.scManager.stop();
-        this.scRegistry.close();
-    }
-
-    @Override
-    protected void doClose() throws IOException {
-        this.scManager.close();
-        // stop the core scheduler
-        SharedResourceManager.shared().release(
-            schedulerResource,
-            scheduler);
-    }
-
-    private StorageContainer getStorageContainer(long scId) {
-        return scRegistry.getStorageContainer(scId);
-    }
-
-    //
-    // Root Range Service
-    //
-
-    @Override
-    public CompletableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest request) {
-        return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).createNamespace(request);
-    }
-
-    @Override
-    public CompletableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest request) {
-        return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).deleteNamespace(request);
-    }
-
-    @Override
-    public CompletableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest request) {
-        return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).getNamespace(request);
-    }
-
-    @Override
-    public CompletableFuture<CreateStreamResponse> createStream(CreateStreamRequest request) {
-        return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).createStream(request);
-    }
-
-    @Override
-    public CompletableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest request) {
-        return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).deleteStream(request);
-    }
-
-    @Override
-    public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
-        return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).getStream(request);
-    }
-
-    //
-    // Stream Meta Range Service
-    //
-
-    @Override
-    public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
-        return getStorageContainer(request.getScId()).getActiveRanges(request);
-    }
-
-    //
-    // Table Service
-    //
-
-    @Override
-    public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
-        return getStorageContainer(request.getScId()).range(request);
-    }
-
-    @Override
-    public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
-        return getStorageContainer(request.getScId()).put(request);
-    }
-
-    @Override
-    public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
-        return getStorageContainer(request.getScId()).delete(request);
-    }
-
-    @Override
-    public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
-        return getStorageContainer(request.getScId()).txn(request);
-    }
-
-    @Override
-    public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
-        return getStorageContainer(request.getScId()).incr(request);
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
new file mode 100644
index 0000000..56fcba2
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl;
+
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY;
+
+import io.grpc.Channel;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import java.io.IOException;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
+
+/**
+ * KeyRange Service.
+ */
+public class StorageContainerStoreImpl
+    extends AbstractLifecycleComponent<StorageConfiguration>
+    implements StorageContainerStore {
+
+    private final StorageContainerManagerFactory scmFactory;
+    private final StorageContainerRegistryImpl scRegistry;
+    private final StorageContainerManager scManager;
+    private final StorageContainerServiceFactory serviceFactory;
+    private final Metadata.Key<Long> scIdKey;
+
+    public StorageContainerStoreImpl(StorageConfiguration conf,
+                                     StorageContainerManagerFactory managerFactory,
+                                     StorageContainerServiceFactory serviceFactory,
+                                     StatsLogger statsLogger) {
+        super("range-service", conf, statsLogger);
+        this.scmFactory = managerFactory;
+        this.scRegistry = new StorageContainerRegistryImpl(
+            new DefaultStorageContainerFactory(serviceFactory));
+        this.scManager = scmFactory.create(conf, scRegistry);
+        this.serviceFactory = serviceFactory;
+        this.scIdKey = Metadata.Key.of(
+            SC_ID_KEY,
+            LongBinaryMarshaller.of());
+    }
+
+    @Override
+    public StorageContainerRegistryImpl getRegistry() {
+        return this.scRegistry;
+    }
+
+    @Override
+    public StorageContainerRoutingService getRoutingService() {
+        return this.scManager;
+    }
+
+    //
+    // Lifecycle management
+    //
+
+    @Override
+    protected void doStart() {
+        this.scManager.start();
+    }
+
+    @Override
+    protected void doStop() {
+        this.scManager.stop();
+        this.scRegistry.close();
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        this.scManager.close();
+        this.serviceFactory.close();
+    }
+
+    StorageContainer getStorageContainer(long scId) {
+        return scRegistry.getStorageContainer(scId);
+    }
+
+    //
+    // Utils for proxies
+    //
+
+    @Override
+    public Channel findChannel(ServerCall<?, ?> serverCall, Metadata headers) {
+        Long scId = headers.get(scIdKey);
+        if (null == scId) {
+            // use the invalid storage container id, so it will fail the request.
+            scId = INVALID_STORAGE_CONTAINER_ID;
+        }
+        return getStorageContainer(scId).getChannel();
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
index 1e4f681..2fe81e7 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
@@ -34,7 +34,7 @@ public class GrpcMetaRangeService extends MetaRangeServiceImplBase {
 
     private final RangeStoreService rangeStore;
 
-    public GrpcMetaRangeService(RangeStore service) {
+    public GrpcMetaRangeService(RangeStoreService service) {
         this.rangeStore = service;
         log.info("Created MetaRange service");
     }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcServices.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcServices.java
new file mode 100644
index 0000000..8cf8d26
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcServices.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.grpc;
+
+import com.google.common.collect.Lists;
+import io.grpc.ServerServiceDefinition;
+import java.util.Collection;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+
+/**
+ * Define all the grpc services associated from a range store.
+ */
+public final class GrpcServices {
+
+    private GrpcServices() {}
+
+    /**
+     * Create the grpc services of a range store.
+     *
+     * @param store range store.
+     * @return the list of grpc services should be associated with the range store.
+     */
+    public static Collection<ServerServiceDefinition> create(RangeStoreService store) {
+        return Lists.newArrayList(
+            new GrpcRootRangeService(store).bindService(),
+            new GrpcMetaRangeService(store).bindService(),
+            new GrpcTableService(store).bindService());
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/Channel404.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/Channel404.java
new file mode 100644
index 0000000..4905aa8
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/Channel404.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import javax.annotation.Nullable;
+
+/**
+ * A channel that always throws {@link Status#NOT_FOUND}.
+ */
+final class Channel404 extends Channel {
+
+    static Channel404 of() {
+        return INSTANCE;
+    }
+
+    private static final Channel404 INSTANCE = new Channel404();
+
+    private static final ClientCall<Object, Object> CALL404 = new ClientCall<Object, Object>() {
+        @Override
+        public void start(Listener<Object> responseListener, Metadata headers) {
+            responseListener.onClose(Status.NOT_FOUND, new Metadata());
+        }
+
+        @Override
+        public void request(int numMessages) {
+            // no-op
+        }
+
+        @Override
+        public void cancel(@Nullable String message, @Nullable Throwable cause) {
+            // no-op
+        }
+
+        @Override
+        public void halfClose() {
+            // no-op
+        }
+
+        @Override
+        public void sendMessage(Object message) {
+            // no-op
+        }
+    };
+
+    private Channel404() {}
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
+        MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
+        return (ClientCall<RequestT, ResponseT>) CALL404;
+    }
+
+    @Override
+    public String authority() {
+        return null;
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerFactory.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerFactory.java
index 9fbf595..2073c0b 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerFactory.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerFactory.java
@@ -14,45 +14,24 @@
 
 package org.apache.bookkeeper.stream.storage.impl.sc;
 
-import java.net.URI;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerFactory;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
 
 /**
  * The default storage container factory for creating {@link StorageContainer}s.
  */
 public class DefaultStorageContainerFactory implements StorageContainerFactory {
 
-    private final StorageConfiguration storageConf;
-    private final StorageContainerPlacementPolicy rangePlacementPolicy;
-    private final OrderedScheduler scheduler;
-    private final MVCCStoreFactory storeFactory;
-    private final URI defaultBackendUri;
+    private final StorageContainerServiceFactory serviceFactory;
 
-    public DefaultStorageContainerFactory(StorageConfiguration storageConf,
-                                          StorageContainerPlacementPolicy rangePlacementPolicy,
-                                          OrderedScheduler scheduler,
-                                          MVCCStoreFactory storeFactory,
-                                          URI defaultBackendUri) {
-        this.storageConf = storageConf;
-        this.rangePlacementPolicy = rangePlacementPolicy;
-        this.scheduler = scheduler;
-        this.storeFactory = storeFactory;
-        this.defaultBackendUri = defaultBackendUri;
+    public DefaultStorageContainerFactory(StorageContainerServiceFactory serviceFactory) {
+        this.serviceFactory = serviceFactory;
     }
 
     @Override
     public StorageContainer createStorageContainer(long scId) {
         return new StorageContainerImpl(
-            storageConf,
-            scId,
-            rangePlacementPolicy,
-            scheduler,
-            storeFactory,
-            defaultBackendUri);
+            serviceFactory, scId);
     }
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainer404.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainer404.java
new file mode 100644
index 0000000..1d87c41
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainer404.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import io.grpc.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+
+/**
+ * A storage container that always responds {@link io.grpc.Status#NOT_FOUND}.
+ */
+final class StorageContainer404 implements StorageContainer {
+
+    static StorageContainer404 of() {
+        return INSTANCE;
+    }
+
+    private static final StorageContainer404 INSTANCE = new StorageContainer404();
+
+    private StorageContainer404() {}
+
+    @Override
+    public long getId() {
+        return -404;
+    }
+
+    @Override
+    public Channel getChannel() {
+        return Channel404.of();
+    }
+
+    @Override
+    public CompletableFuture<StorageContainer> start() {
+        return FutureUtils.value(this);
+    }
+
+    @Override
+    public CompletableFuture<Void> stop() {
+        return FutureUtils.Void();
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
index 52fa060..4bdc801 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
@@ -18,123 +18,47 @@
 
 package org.apache.bookkeeper.stream.storage.impl.sc;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_RANGE_ID;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_STREAM_ID;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_RANGE_ID;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STREAM_ID;
-
-import com.google.common.collect.Lists;
-import java.net.URI;
-import java.util.List;
+import io.grpc.Channel;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import java.io.IOException;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import lombok.AccessLevel;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.protocol.RangeId;
-import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
-import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
-import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
-import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreCache;
-import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreFactory;
-import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreImpl;
-import org.apache.bookkeeper.stream.storage.impl.metadata.MetaRangeStoreFactory;
-import org.apache.bookkeeper.stream.storage.impl.metadata.MetaRangeStoreImpl;
-import org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreFactory;
-import org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl;
-import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
 
 /**
  * The default implementation of {@link StorageContainer}.
  */
 @Slf4j
-public class StorageContainerImpl
-    implements StorageContainer {
+class StorageContainerImpl implements StorageContainer {
 
+    private final String containerName;
+    private final StorageContainerService service;
     private final long scId;
+    private final Server grpcServer;
+    private volatile Channel channel = Channel404.of();
 
-    // store container that used for fail requests.
-    private final StorageContainer failRequestStorageContainer;
-    // store factory
-    private final MVCCStoreFactory storeFactory;
-    // storage container
-    @Getter(value = AccessLevel.PACKAGE)
-    private MetaRangeStore mgStore;
-    @Getter(value = AccessLevel.PACKAGE)
-    private final MetaRangeStoreFactory mrStoreFactory;
-    // root range
-    @Getter(value = AccessLevel.PACKAGE)
-    private RootRangeStore rootRange;
-    @Getter(value = AccessLevel.PACKAGE)
-    private final RootRangeStoreFactory rrStoreFactory;
-    // table range stores
-    @Getter(value = AccessLevel.PACKAGE)
-    private final TableStoreCache tableStoreCache;
-    @Getter(value = AccessLevel.PACKAGE)
-    private final TableStoreFactory tableStoreFactory;
-
-    public StorageContainerImpl(StorageConfiguration storageConf,
-                                long scId,
-                                StorageContainerPlacementPolicy rangePlacementPolicy,
-                                OrderedScheduler scheduler,
-                                MVCCStoreFactory storeFactory,
-                                URI defaultBackendUri) {
-        this(
-            scId,
-            scheduler,
-            storeFactory,
-            store -> new RootRangeStoreImpl(
-                defaultBackendUri, store, rangePlacementPolicy, scheduler.chooseThread(scId)),
-            store -> new MetaRangeStoreImpl(store, rangePlacementPolicy, scheduler.chooseThread(scId)),
-            store -> new TableStoreImpl(store));
+    StorageContainerImpl(StorageContainerServiceFactory serviceFactory, long scId) {
+        this.scId = scId;
+        this.service = serviceFactory.createStorageContainerService(scId);
+        this.containerName = "container-" + scId;
+        this.grpcServer = buildGrpcServer(containerName, service.getRegisteredServices());
     }
 
-    public StorageContainerImpl(long scId,
-                                OrderedScheduler scheduler,
-                                MVCCStoreFactory storeFactory,
-                                RootRangeStoreFactory rrStoreFactory,
-                                MetaRangeStoreFactory mrStoreFactory,
-                                TableStoreFactory tableStoreFactory) {
-        this.scId = scId;
-        this.failRequestStorageContainer = FailRequestStorageContainer.of(scheduler);
-        this.rootRange = failRequestStorageContainer;
-        this.mgStore = failRequestStorageContainer;
-        this.storeFactory = storeFactory;
-        this.rrStoreFactory = rrStoreFactory;
-        this.mrStoreFactory = mrStoreFactory;
-        this.tableStoreFactory = tableStoreFactory;
-        this.tableStoreCache = new TableStoreCache(storeFactory, tableStoreFactory);
+    private static Server buildGrpcServer(String serverName,
+                                          Collection<ServerServiceDefinition> services) {
+        InProcessServerBuilder builder = InProcessServerBuilder.forName(serverName);
+        services.forEach(service -> builder.addService(service));
+        return builder
+            .directExecutor()
+            .build();
     }
 
     //
@@ -146,42 +70,24 @@ public class StorageContainerImpl
         return scId;
     }
 
-    private CompletableFuture<Void> startRootRangeStore() {
-        if (ROOT_STORAGE_CONTAINER_ID != scId) {
-            return FutureUtils.Void();
-        }
-        return storeFactory.openStore(
-            ROOT_STORAGE_CONTAINER_ID,
-            ROOT_STREAM_ID,
-            ROOT_RANGE_ID
-        ).thenApply(store -> {
-            rootRange = rrStoreFactory.createStore(store);
-            return null;
-        });
-    }
-
-    private CompletableFuture<Void> startMetaRangeStore(long scId) {
-        return storeFactory.openStore(
-            scId,
-            CONTAINER_META_STREAM_ID,
-            CONTAINER_META_RANGE_ID
-        ).thenApply(store -> {
-            mgStore = mrStoreFactory.createStore(store);
-            return null;
-        });
-    }
-
     @Override
     public CompletableFuture<StorageContainer> start() {
         log.info("Starting storage container ({}) ...", getId());
 
-        List<CompletableFuture<Void>> futures = Lists.newArrayList(
-            startRootRangeStore(),
-            startMetaRangeStore(scId));
-
-        return FutureUtils.collect(futures).thenApply(ignored -> {
-            log.info("Successfully started storage container ({}).", getId());
-            return StorageContainerImpl.this;
+        return service.start().thenCompose(ignored -> {
+            try {
+                grpcServer.start();
+                log.info("Successfully started storage container ({}).", getId());
+
+                channel = InProcessChannelBuilder.forName(containerName)
+                    .usePlaintext()
+                    .directExecutor()
+                    .build();
+                return FutureUtils.value(StorageContainerImpl.this);
+            } catch (IOException e) {
+                log.error("Failed to start the grpc server for storage container ({})", getId(), e);
+                return FutureUtils.exception(e);
+            }
         });
     }
 
@@ -189,160 +95,40 @@ public class StorageContainerImpl
     public CompletableFuture<Void> stop() {
         log.info("Stopping storage container ({}) ...", getId());
 
-        return storeFactory.closeStores(scId).thenApply(ignored -> {
-            log.info("Successfully stopped storage container ({}).", getId());
-            return null;
-        });
-    }
+        Channel existingChannel = channel;
 
-    @Override
-    public void close() {
-        stop().join();
-    }
-
-    //
-    // Storage Container API
-    //
-
-    //
-    // Namespace API
-    //
-
-    @Override
-    public CompletableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest request) {
-        return rootRange.createNamespace(request);
-    }
+        // set channel to 404
+        channel = Channel404.of();
 
-    @Override
-    public CompletableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest request) {
-        return rootRange.deleteNamespace(request);
-    }
-
-    @Override
-    public CompletableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest request) {
-        return rootRange.getNamespace(request);
-    }
-
-    //
-    // Stream API.
-    //
-
-    @Override
-    public CompletableFuture<CreateStreamResponse> createStream(CreateStreamRequest request) {
-        return rootRange.createStream(request);
-    }
-
-    @Override
-    public CompletableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest request) {
-        return rootRange.deleteStream(request);
-    }
+        if (null != existingChannel) {
+            if (existingChannel instanceof ManagedChannel) {
+                ((ManagedChannel) existingChannel).shutdown();
+            }
+        }
 
-    @Override
-    public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
-        return rootRange.getStream(request);
-    }
+        if (null != grpcServer) {
+            grpcServer.shutdown();
+        }
 
-    //
-    // Stream Meta Range API.
-    //
+        return service.stop().thenApply(ignored -> {
 
-    @Override
-    public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
-        return mgStore.getActiveRanges(request);
+            log.info("Successfully stopped storage container ({}).", getId());
+            return null;
+        });
     }
 
     //
-    // Table API
+    // Grpc
     //
 
-
-    @Override
-    public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
-        checkArgument(KV_RANGE_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        RangeRequest rr = request.getKvRangeReq();
-        RoutingHeader header = rr.getHeader();
-
-        RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
-        TableStore store = tableStoreCache.getTableStore(rid);
-        if (null != store) {
-            return store.range(request);
-        } else {
-            return tableStoreCache.openTableStore(scId, rid)
-                .thenCompose(s -> s.range(request));
-        }
-    }
-
-    @Override
-    public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
-        checkArgument(KV_PUT_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        PutRequest rr = request.getKvPutReq();
-        RoutingHeader header = rr.getHeader();
-
-        RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
-        TableStore store = tableStoreCache.getTableStore(rid);
-        if (null != store) {
-            return store.put(request);
-        } else {
-            return tableStoreCache.openTableStore(scId, rid)
-                .thenCompose(s -> s.put(request));
-        }
-    }
-
     @Override
-    public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
-        checkArgument(KV_DELETE_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        DeleteRangeRequest rr = request.getKvDeleteReq();
-        RoutingHeader header = rr.getHeader();
-
-        RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
-        TableStore store = tableStoreCache.getTableStore(rid);
-        if (null != store) {
-            return store.delete(request);
-        } else {
-            return tableStoreCache.openTableStore(scId, rid)
-                .thenCompose(s -> s.delete(request));
-        }
+    public Channel getChannel() {
+        return channel;
     }
 
     @Override
-    public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
-        checkArgument(KV_TXN_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        TxnRequest rr = request.getKvTxnReq();
-        RoutingHeader header = rr.getHeader();
-
-        RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
-        TableStore store = tableStoreCache.getTableStore(rid);
-        if (null != store) {
-            return store.txn(request);
-        } else {
-            return tableStoreCache.openTableStore(scId, rid)
-                .thenCompose(s -> s.txn(request));
-        }
+    public void close() {
+        stop().join();
     }
 
-    @Override
-    public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
-        checkArgument(KV_INCR_REQ == request.getRequestCase());
-
-        long scId = request.getScId();
-        IncrementRequest ir = request.getKvIncrReq();
-        RoutingHeader header = ir.getHeader();
-
-        RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
-        TableStore store = tableStoreCache.getTableStore(rid);
-        if (null != store) {
-            return store.incr(request);
-        } else {
-            return tableStoreCache.openTableStore(scId, rid)
-                .thenCompose(s -> s.incr(request));
-        }
-    }
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 4499801..48330b7 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -22,7 +22,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.exceptions.ObjectClosedException;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerFactory;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
@@ -39,13 +38,10 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
     private final StorageContainerFactory scFactory;
     private final ConcurrentMap<Long, StorageContainer> containers;
     private final ReentrantReadWriteLock closeLock;
-    private final StorageContainer failRequestStorageContainer;
     private boolean closed = false;
 
-    public StorageContainerRegistryImpl(StorageContainerFactory factory,
-                                        OrderedScheduler scheduler) {
+    public StorageContainerRegistryImpl(StorageContainerFactory factory) {
         this.scFactory = factory;
-        this.failRequestStorageContainer = FailRequestStorageContainer.of(scheduler);
         this.containers = Maps.newConcurrentMap();
         this.closeLock = new ReentrantReadWriteLock();
     }
@@ -62,7 +58,7 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
 
     @Override
     public StorageContainer getStorageContainer(long storageContainerId) {
-        return containers.getOrDefault(storageContainerId, failRequestStorageContainer);
+        return containers.getOrDefault(storageContainerId, StorageContainer404.of());
     }
 
     @Override
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
similarity index 85%
rename from stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
index 8217e89..e15b808 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.storage.impl.sc;
+package org.apache.bookkeeper.stream.storage.impl.service;
 
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
 
 import io.grpc.Status;
@@ -40,49 +39,39 @@ import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 
 /**
  * It is a single-ton implementation that fails all requests.
  */
-public final class FailRequestStorageContainer implements StorageContainer {
+final class FailRequestRangeStoreService implements RangeStoreService {
 
-    public static StorageContainer of(OrderedScheduler scheduler) {
-        return new FailRequestStorageContainer(scheduler);
+    static RangeStoreService of(OrderedScheduler scheduler) {
+        return new FailRequestRangeStoreService(scheduler);
     }
 
     private final OrderedScheduler scheduler;
 
-    private FailRequestStorageContainer(OrderedScheduler scheduler) {
+    private FailRequestRangeStoreService(OrderedScheduler scheduler) {
         this.scheduler = scheduler;
     }
 
-    @Override
-    public long getId() {
-        return INVALID_STORAGE_CONTAINER_ID;
+    private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
+        CompletableFuture<T> future = FutureUtils.createFuture();
+        scheduler.executeOrdered(scId, () -> {
+            future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
+        });
+        return future;
     }
 
     @Override
-    public CompletableFuture<StorageContainer> start() {
-        return CompletableFuture.completedFuture(this);
+    public CompletableFuture<Void> start() {
+        return FutureUtils.Void();
     }
 
     @Override
     public CompletableFuture<Void> stop() {
-        return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public void close() {
-        // no-op
-    }
-
-    private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
-        CompletableFuture<T> future = FutureUtils.createFuture();
-        scheduler.executeOrdered(scId, () -> {
-            future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
-        });
-        return future;
+        return FutureUtils.Void();
     }
 
     //
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceFactoryImpl.java
new file mode 100644
index 0000000..77a7950
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceFactoryImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.service;
+
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
+import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
+
+/**
+ * The default storage container factory for creating {@link StorageContainer}s.
+ */
+public class RangeStoreContainerServiceFactoryImpl implements StorageContainerServiceFactory {
+
+    private final RangeStoreServiceFactory serviceFactory;
+
+    public RangeStoreContainerServiceFactoryImpl(RangeStoreServiceFactory serviceFactory) {
+        this.serviceFactory = serviceFactory;
+    }
+
+    @Override
+    public StorageContainerService createStorageContainerService(long scId) {
+        return new RangeStoreContainerServiceImpl(serviceFactory.createService(scId));
+    }
+
+    @Override
+    public void close() {
+        serviceFactory.close();
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceImpl.java
new file mode 100644
index 0000000..8a8f6d3
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.service;
+
+import io.grpc.ServerServiceDefinition;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService;
+import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcServices;
+
+/**
+ * Implement the range service that runs on one storage container.
+ */
+class RangeStoreContainerServiceImpl implements StorageContainerService {
+
+    private final RangeStoreService store;
+    private final Collection<ServerServiceDefinition> grpcServices;
+
+    RangeStoreContainerServiceImpl(RangeStoreService service) {
+        this.store = service;
+        this.grpcServices = GrpcServices.create(service);
+    }
+
+    @Override
+    public Collection<ServerServiceDefinition> getRegisteredServices() {
+        return grpcServices;
+    }
+
+    @Override
+    public CompletableFuture<Void> start() {
+        return store.start();
+    }
+
+    @Override
+    public CompletableFuture<Void> stop() {
+        return store.stop();
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
new file mode 100644
index 0000000..02dd8a3
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.service;
+
+import java.net.URI;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SharedResourceManager;
+import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
+import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
+
+/**
+ * Default implementation of {@link RangeStoreServiceFactory}.
+ */
+public class RangeStoreServiceFactoryImpl implements RangeStoreServiceFactory {
+
+    private final StorageConfiguration storageConf;
+    private final StorageContainerPlacementPolicy rangePlacementPolicy;
+    private final Resource<OrderedScheduler> schedulerResource;
+    private final OrderedScheduler scheduler;
+    private final MVCCStoreFactory storeFactory;
+    private final URI defaultBackendUri;
+
+    public RangeStoreServiceFactoryImpl(StorageConfiguration storageConf,
+                                        StorageContainerPlacementPolicy rangePlacementPolicy,
+                                        Resource<OrderedScheduler> schedulerResource,
+                                        MVCCStoreFactory storeFactory,
+                                        URI defaultBackendUri) {
+        this.storageConf = storageConf;
+        this.rangePlacementPolicy = rangePlacementPolicy;
+        this.schedulerResource = schedulerResource;
+        this.scheduler = SharedResourceManager.shared().get(schedulerResource);
+        this.storeFactory = storeFactory;
+        this.defaultBackendUri = defaultBackendUri;
+    }
+
+    @Override
+    public RangeStoreService createService(long scId) {
+        return new RangeStoreServiceImpl(
+            storageConf,
+            scId,
+            rangePlacementPolicy,
+            scheduler,
+            storeFactory,
+            defaultBackendUri);
+    }
+
+    @Override
+    public void close() {
+        SharedResourceManager.shared().release(schedulerResource, scheduler);
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
similarity index 86%
copy from stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
copy to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index 52fa060..0304fc8 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.storage.impl.sc;
+package org.apache.bookkeeper.stream.storage.impl.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
@@ -63,8 +63,8 @@ import org.apache.bookkeeper.stream.protocol.RangeId;
 import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
 import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreCache;
 import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreFactory;
@@ -76,16 +76,13 @@ import org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
 /**
- * The default implementation of {@link StorageContainer}.
+ * The service implementation running in a storage container.
  */
 @Slf4j
-public class StorageContainerImpl
-    implements StorageContainer {
+class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
 
     private final long scId;
 
-    // store container that used for fail requests.
-    private final StorageContainer failRequestStorageContainer;
     // store factory
     private final MVCCStoreFactory storeFactory;
     // storage container
@@ -104,12 +101,12 @@ public class StorageContainerImpl
     @Getter(value = AccessLevel.PACKAGE)
     private final TableStoreFactory tableStoreFactory;
 
-    public StorageContainerImpl(StorageConfiguration storageConf,
-                                long scId,
-                                StorageContainerPlacementPolicy rangePlacementPolicy,
-                                OrderedScheduler scheduler,
-                                MVCCStoreFactory storeFactory,
-                                URI defaultBackendUri) {
+    RangeStoreServiceImpl(StorageConfiguration storageConf,
+                          long scId,
+                          StorageContainerPlacementPolicy rangePlacementPolicy,
+                          OrderedScheduler scheduler,
+                          MVCCStoreFactory storeFactory,
+                          URI defaultBackendUri) {
         this(
             scId,
             scheduler,
@@ -120,14 +117,15 @@ public class StorageContainerImpl
             store -> new TableStoreImpl(store));
     }
 
-    public StorageContainerImpl(long scId,
-                                OrderedScheduler scheduler,
-                                MVCCStoreFactory storeFactory,
-                                RootRangeStoreFactory rrStoreFactory,
-                                MetaRangeStoreFactory mrStoreFactory,
-                                TableStoreFactory tableStoreFactory) {
+    RangeStoreServiceImpl(long scId,
+                          OrderedScheduler scheduler,
+                          MVCCStoreFactory storeFactory,
+                          RootRangeStoreFactory rrStoreFactory,
+                          MetaRangeStoreFactory mrStoreFactory,
+                          TableStoreFactory tableStoreFactory) {
         this.scId = scId;
-        this.failRequestStorageContainer = FailRequestStorageContainer.of(scheduler);
+        RangeStoreService failRequestStorageContainer =
+            FailRequestRangeStoreService.of(scheduler);
         this.rootRange = failRequestStorageContainer;
         this.mgStore = failRequestStorageContainer;
         this.storeFactory = storeFactory;
@@ -141,7 +139,6 @@ public class StorageContainerImpl
     // Services
     //
 
-    @Override
     public long getId() {
         return scId;
     }
@@ -171,28 +168,16 @@ public class StorageContainerImpl
         });
     }
 
-    @Override
-    public CompletableFuture<StorageContainer> start() {
-        log.info("Starting storage container ({}) ...", getId());
-
+    public CompletableFuture<Void> start() {
         List<CompletableFuture<Void>> futures = Lists.newArrayList(
             startRootRangeStore(),
             startMetaRangeStore(scId));
 
-        return FutureUtils.collect(futures).thenApply(ignored -> {
-            log.info("Successfully started storage container ({}).", getId());
-            return StorageContainerImpl.this;
-        });
+        return FutureUtils.collect(futures).thenApply(ignored -> null);
     }
 
-    @Override
     public CompletableFuture<Void> stop() {
-        log.info("Stopping storage container ({}) ...", getId());
-
-        return storeFactory.closeStores(scId).thenApply(ignored -> {
-            log.info("Successfully stopped storage container ({}).", getId());
-            return null;
-        });
+        return storeFactory.closeStores(scId);
     }
 
     @Override
@@ -345,4 +330,5 @@ public class StorageContainerImpl
                 .thenCompose(s -> s.incr(request));
         }
     }
+
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/package-info.java
new file mode 100644
index 0000000..68ad094
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes that actually implement the storage services.
+ */
+package org.apache.bookkeeper.stream.storage.impl.service;
\ No newline at end of file
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestRangeStoreBuilder.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
similarity index 84%
rename from stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestRangeStoreBuilder.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
index 85d3ef0..3d3a0ad 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestRangeStoreBuilder.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
@@ -18,18 +18,18 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.net.URI;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.impl.StorageContainerStoreImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Unit test for {@link RangeStoreBuilder}.
+ * Unit test for {@link StorageContainerStoreBuilder}.
  */
-public class TestRangeStoreBuilder {
+public class TestStorageContainerStoreBuilder {
 
     private MVCCStoreFactory storeFactory;
     private final URI uri = URI.create("distributedlog://127.0.0.1/stream/storage");
@@ -41,7 +41,7 @@ public class TestRangeStoreBuilder {
 
     @Test(expected = NullPointerException.class)
     public void testBuildNullConfiguration() {
-        RangeStoreBuilder.newBuilder()
+        StorageContainerStoreBuilder.newBuilder()
             .withStorageConfiguration(null)
             .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
@@ -52,7 +52,7 @@ public class TestRangeStoreBuilder {
 
     @Test(expected = NullPointerException.class)
     public void testBuildNullResources() {
-        RangeStoreBuilder.newBuilder()
+        StorageContainerStoreBuilder.newBuilder()
             .withStorageConfiguration(mock(StorageConfiguration.class))
             .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(null)
@@ -63,7 +63,7 @@ public class TestRangeStoreBuilder {
 
     @Test(expected = NullPointerException.class)
     public void testBuildNullRGManagerFactory() {
-        RangeStoreBuilder.newBuilder()
+        StorageContainerStoreBuilder.newBuilder()
             .withStorageConfiguration(mock(StorageConfiguration.class))
             .withStorageContainerManagerFactory(null)
             .withStorageResources(StorageResources.create())
@@ -74,7 +74,7 @@ public class TestRangeStoreBuilder {
 
     @Test(expected = NullPointerException.class)
     public void testBuildNullStoreFactory() {
-        RangeStoreBuilder.newBuilder()
+        StorageContainerStoreBuilder.newBuilder()
             .withStorageConfiguration(mock(StorageConfiguration.class))
             .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
@@ -85,7 +85,7 @@ public class TestRangeStoreBuilder {
 
     @Test(expected = NullPointerException.class)
     public void testBuildNullDefaultBackendUri() {
-        RangeStoreBuilder.newBuilder()
+        StorageContainerStoreBuilder.newBuilder()
             .withStorageConfiguration(mock(StorageConfiguration.class))
             .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
@@ -96,14 +96,14 @@ public class TestRangeStoreBuilder {
 
     @Test
     public void testBuild() {
-        RangeStore rangeStore = RangeStoreBuilder.newBuilder()
+        StorageContainerStore storageContainerStore = StorageContainerStoreBuilder.newBuilder()
             .withStorageConfiguration(mock(StorageConfiguration.class))
             .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(storeFactory)
             .withDefaultBackendUri(uri)
             .build();
-        assertTrue(rangeStore instanceof RangeStoreImpl);
+        assertTrue(storageContainerStore instanceof StorageContainerStoreImpl);
     }
 
 }
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
similarity index 69%
rename from stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
index ac4bb78..a269466 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
@@ -14,6 +14,7 @@
 
 package org.apache.bookkeeper.stream.storage.impl;
 
+import static org.apache.bookkeeper.common.util.ListenableFutures.fromListenableFuture;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateNamespaceRequest;
@@ -33,14 +34,24 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.protobuf.ByteString;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerServiceDefinition;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
-import java.net.URI;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.grpc.proxy.ProxyHandlerRegistry;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamName;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
@@ -65,14 +76,21 @@ import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
+import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceFutureStub;
+import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
+import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.RangeStoreBuilder;
-import org.apache.bookkeeper.stream.storage.StorageResources;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceFutureStub;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcServices;
 import org.apache.bookkeeper.stream.storage.impl.sc.LocalStorageContainerManager;
+import org.apache.bookkeeper.stream.storage.impl.service.RangeStoreContainerServiceFactoryImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.junit.After;
@@ -80,10 +98,10 @@ import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Unit test of {@link RangeStoreImpl}.
+ * Unit test of {@link StorageContainerStoreImpl}.
  */
 @Slf4j
-public class TestRangeStoreImpl {
+public class TestStorageContainerStoreImpl {
 
     private static final StreamProperties streamProps = StreamProperties.newBuilder()
         .setStorageContainerId(System.currentTimeMillis())
@@ -115,8 +133,13 @@ public class TestRangeStoreImpl {
         NamespaceConfiguration.newBuilder()
             .setDefaultStreamConf(DEFAULT_STREAM_CONF)
             .build();
-    private StorageResources storageResources;
-    private RangeStoreImpl rangeStore;
+    private RangeStoreService mockRangeStoreService;
+    private StorageContainerStoreImpl rangeStore;
+    private Server server;
+    private Channel channel;
+    private TableServiceFutureStub tableService;
+    private RootRangeServiceFutureStub rootRangeService;
+    private MetaRangeServiceFutureStub metaRangeService;
 
     //
     // Utils for table api
@@ -196,8 +219,6 @@ public class TestRangeStoreImpl {
     @SuppressWarnings("unchecked")
     @Before
     public void setUp() throws Exception {
-        storageResources = StorageResources.create();
-
         Endpoint endpoint = createEndpoint("127.0.0.1", 0);
 
         // create the client manager
@@ -208,19 +229,61 @@ public class TestRangeStoreImpl {
         when(storeFactory.closeStores(anyLong()))
             .thenReturn(FutureUtils.Void());
 
-        rangeStore = (RangeStoreImpl) RangeStoreBuilder.newBuilder()
-            .withStorageConfiguration(storageConf)
-            .withStorageResources(storageResources)
-            .withStorageContainerManagerFactory((storeConf, rgRegistry)
-                -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2))
-            .withRangeStoreFactory(storeFactory)
-            .withDefaultBackendUri(URI.create("distributedlog://127.0.0.1/stream/storage"))
-            .build();
+        RangeStoreServiceFactory rangeStoreServiceFactory = mock(RangeStoreServiceFactory.class);
+        mockRangeStoreService = mock(RangeStoreService.class);
+        when(mockRangeStoreService.start()).thenReturn(FutureUtils.Void());
+        when(mockRangeStoreService.stop()).thenReturn(FutureUtils.Void());
+        when(rangeStoreServiceFactory.createService(anyLong()))
+            .thenReturn(mockRangeStoreService);
+
+        rangeStore = new StorageContainerStoreImpl(
+            storageConf,
+            (storeConf, rgRegistry)
+                -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2),
+            new RangeStoreContainerServiceFactoryImpl(rangeStoreServiceFactory),
+            NullStatsLogger.INSTANCE);
+
         rangeStore.start();
+
+        final String serverName = "test-server";
+
+        Collection<ServerServiceDefinition> grpcServices = GrpcServices.create(null);
+        ProxyHandlerRegistry.Builder registryBuilder = ProxyHandlerRegistry.newBuilder();
+        grpcServices.forEach(service -> registryBuilder.addService(service));
+        ProxyHandlerRegistry registry = registryBuilder
+            .setChannelFinder(rangeStore)
+            .build();
+        server = InProcessServerBuilder.forName(serverName)
+            .fallbackHandlerRegistry(registry)
+            .directExecutor()
+            .build()
+            .start();
+
+        channel = InProcessChannelBuilder.forName(serverName)
+            .usePlaintext()
+            .build();
+
+        // intercept the channel with storage container information.
+        channel = ClientInterceptors.intercept(
+            channel,
+            new StorageContainerClientInterceptor(0L));
+
+
+        tableService = TableServiceGrpc.newFutureStub(channel);
+        metaRangeService = MetaRangeServiceGrpc.newFutureStub(channel);
+        rootRangeService = RootRangeServiceGrpc.newFutureStub(channel);
     }
 
     @After
     public void tearDown() {
+        if (null != channel) {
+            if (channel instanceof ManagedChannel) {
+                ((ManagedChannel) channel).shutdown();
+            }
+        }
+        if (null != server) {
+            server.shutdown();
+        }
         if (null != rangeStore) {
             rangeStore.close();
         }
@@ -247,8 +310,8 @@ public class TestRangeStoreImpl {
         rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
 
         String colName = "test-create-namespace-no-root-storage-container-store";
-        verifyNotFoundException(
-            rangeStore.createNamespace(createCreateNamespaceRequest(colName, namespaceConf)),
+        verifyNotFoundException(fromListenableFuture(
+            rootRangeService.createNamespace(createCreateNamespaceRequest(colName, namespaceConf))),
             Status.NOT_FOUND);
     }
 
@@ -257,8 +320,8 @@ public class TestRangeStoreImpl {
         rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
 
         String colName = "test-delete-namespace-no-root-storage-container-store";
-        verifyNotFoundException(
-            rangeStore.deleteNamespace(createDeleteNamespaceRequest(colName)),
+        verifyNotFoundException(fromListenableFuture(
+            rootRangeService.deleteNamespace(createDeleteNamespaceRequest(colName))),
             Status.NOT_FOUND);
     }
 
@@ -267,8 +330,8 @@ public class TestRangeStoreImpl {
         rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
 
         String colName = "test-get-namespace-no-root-storage-container-store";
-        verifyNotFoundException(
-            rangeStore.getNamespace(createGetNamespaceRequest(colName)),
+        verifyNotFoundException(fromListenableFuture(
+            rootRangeService.getNamespace(createGetNamespaceRequest(colName))),
             Status.NOT_FOUND);
     }
 
@@ -276,41 +339,35 @@ public class TestRangeStoreImpl {
     public void testCreateNamespaceMockRootStorageContainerStore() throws Exception {
         String colName = "test-create-namespace-mock-root-storage-container-store";
 
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
         CreateNamespaceResponse createResp = CreateNamespaceResponse.newBuilder()
             .setCode(StatusCode.NAMESPACE_EXISTS)
             .build();
         CreateNamespaceRequest request = createCreateNamespaceRequest(colName, namespaceConf);
 
-        when(scStore.createNamespace(request))
+        when(mockRangeStoreService.createNamespace(request))
             .thenReturn(CompletableFuture.completedFuture(createResp));
 
         CompletableFuture<CreateNamespaceResponse> createRespFuture =
-            rangeStore.createNamespace(request);
-        verify(scStore, times(1)).createNamespace(request);
+            fromListenableFuture(rootRangeService.createNamespace(request));
         assertTrue(createResp == createRespFuture.get());
+        verify(mockRangeStoreService, times(1)).createNamespace(request);
     }
 
     @Test
     public void testDeleteNamespaceMockRootStorageContainerStore() throws Exception {
         String colName = "test-delete-namespace-no-root-storage-container-store";
 
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
         DeleteNamespaceResponse deleteResp = DeleteNamespaceResponse.newBuilder()
             .setCode(StatusCode.NAMESPACE_NOT_FOUND)
             .build();
         DeleteNamespaceRequest request = createDeleteNamespaceRequest(colName);
 
-        when(scStore.deleteNamespace(request))
+        when(mockRangeStoreService.deleteNamespace(request))
             .thenReturn(CompletableFuture.completedFuture(deleteResp));
 
         CompletableFuture<DeleteNamespaceResponse> deleteRespFuture =
-            rangeStore.deleteNamespace(request);
-        verify(scStore, times(1)).deleteNamespace(request);
+            fromListenableFuture(rootRangeService.deleteNamespace(request));
+        verify(mockRangeStoreService, times(1)).deleteNamespace(request);
         assertTrue(deleteResp == deleteRespFuture.get());
     }
 
@@ -318,20 +375,17 @@ public class TestRangeStoreImpl {
     public void testGetNamespaceMockRootStorageContainerStore() throws Exception {
         String colName = "test-get-namespace-no-root-storage-container-store";
 
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
         GetNamespaceResponse getResp = GetNamespaceResponse.newBuilder()
             .setCode(StatusCode.NAMESPACE_NOT_FOUND)
             .build();
         GetNamespaceRequest request = createGetNamespaceRequest(colName);
 
-        when(scStore.getNamespace(request)).thenReturn(
+        when(mockRangeStoreService.getNamespace(request)).thenReturn(
             CompletableFuture.completedFuture(getResp));
 
         CompletableFuture<GetNamespaceResponse> getRespFuture =
-            rangeStore.getNamespace(request);
-        verify(scStore, times(1)).getNamespace(request);
+            fromListenableFuture(rootRangeService.getNamespace(request));
+        verify(mockRangeStoreService, times(1)).getNamespace(request);
         assertTrue(getResp == getRespFuture.get());
     }
 
@@ -345,8 +399,8 @@ public class TestRangeStoreImpl {
 
         String colName = "test-create-namespace-no-root-storage-container-store";
         String streamName = colName;
-        verifyNotFoundException(
-            rangeStore.createStream(createCreateStreamRequest(colName, streamName, DEFAULT_STREAM_CONF)),
+        verifyNotFoundException(fromListenableFuture(
+            rootRangeService.createStream(createCreateStreamRequest(colName, streamName, DEFAULT_STREAM_CONF))),
             Status.NOT_FOUND);
     }
 
@@ -356,8 +410,8 @@ public class TestRangeStoreImpl {
 
         String colName = "test-delete-namespace-no-root-storage-container-store";
         String streamName = colName;
-        verifyNotFoundException(
-            rangeStore.deleteStream(createDeleteStreamRequest(colName, streamName)),
+        verifyNotFoundException(fromListenableFuture(
+            rootRangeService.deleteStream(createDeleteStreamRequest(colName, streamName))),
             Status.NOT_FOUND);
     }
 
@@ -367,8 +421,8 @@ public class TestRangeStoreImpl {
 
         String colName = "test-get-namespace-no-root-storage-container-store";
         String streamName = colName;
-        verifyNotFoundException(
-            rangeStore.getStream(createGetStreamRequest(colName, streamName)),
+        verifyNotFoundException(fromListenableFuture(
+            rootRangeService.getStream(createGetStreamRequest(colName, streamName))),
             Status.NOT_FOUND);
     }
 
@@ -377,19 +431,16 @@ public class TestRangeStoreImpl {
         String colName = "test-create-namespace-mock-root-storage-container-store";
         String streamName = colName;
 
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
         CreateStreamResponse createResp = CreateStreamResponse.newBuilder()
             .setCode(StatusCode.STREAM_EXISTS)
             .build();
         CreateStreamRequest createReq = createCreateStreamRequest(colName, streamName, DEFAULT_STREAM_CONF);
-        when(scStore.createStream(createReq)).thenReturn(
+        when(mockRangeStoreService.createStream(createReq)).thenReturn(
             CompletableFuture.completedFuture(createResp));
 
         CompletableFuture<CreateStreamResponse> createRespFuture =
-            rangeStore.createStream(createReq);
-        verify(scStore, times(1)).createStream(createReq);
+            fromListenableFuture(rootRangeService.createStream(createReq));
+        verify(mockRangeStoreService, times(1)).createStream(createReq);
         assertTrue(createResp == createRespFuture.get());
     }
 
@@ -398,19 +449,16 @@ public class TestRangeStoreImpl {
         String colName = "test-delete-namespace-no-root-storage-container-store";
         String streamName = colName;
 
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
         DeleteStreamResponse deleteResp = DeleteStreamResponse.newBuilder()
             .setCode(StatusCode.STREAM_NOT_FOUND)
             .build();
         DeleteStreamRequest deleteReq = createDeleteStreamRequest(colName, streamName);
-        when(scStore.deleteStream(deleteReq)).thenReturn(
+        when(mockRangeStoreService.deleteStream(deleteReq)).thenReturn(
             CompletableFuture.completedFuture(deleteResp));
 
         CompletableFuture<DeleteStreamResponse> deleteRespFuture =
-            rangeStore.deleteStream(deleteReq);
-        verify(scStore, times(1)).deleteStream(deleteReq);
+            fromListenableFuture(rootRangeService.deleteStream(deleteReq));
+        verify(mockRangeStoreService, times(1)).deleteStream(deleteReq);
         assertTrue(deleteResp == deleteRespFuture.get());
     }
 
@@ -419,26 +467,25 @@ public class TestRangeStoreImpl {
         String colName = "test-get-namespace-no-root-storage-container-store";
         String streamName = colName;
 
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
         GetStreamResponse getResp = GetStreamResponse.newBuilder()
             .setCode(StatusCode.STREAM_NOT_FOUND)
             .build();
         GetStreamRequest getReq = createGetStreamRequest(colName, streamName);
-        when(scStore.getStream(getReq)).thenReturn(
+        when(mockRangeStoreService.getStream(getReq)).thenReturn(
             CompletableFuture.completedFuture(getResp));
 
         CompletableFuture<GetStreamResponse> getRespFuture =
-            rangeStore.getStream(getReq);
-        verify(scStore, times(1)).getStream(getReq);
+            fromListenableFuture(rootRangeService.getStream(getReq));
+        verify(mockRangeStoreService, times(1)).getStream(getReq);
         assertTrue(getResp == getRespFuture.get());
     }
 
     @Test
     public void testGetActiveRangesNoManager() throws Exception {
-        verifyNotFoundException(
-            rangeStore.getActiveRanges(createGetActiveRangesRequest(12L, 34L)),
+        rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+
+        verifyNotFoundException(fromListenableFuture(
+            metaRangeService.getActiveRanges(createGetActiveRangesRequest(12L, 34L))),
             Status.NOT_FOUND);
     }
 
@@ -446,25 +493,17 @@ public class TestRangeStoreImpl {
     public void testGetActiveRangesMockManager() throws Exception {
         long scId = System.currentTimeMillis();
 
-        StreamProperties props = StreamProperties.newBuilder(streamProps)
-            .setStorageContainerId(scId)
-            .build();
-
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(scId, scStore);
-
         StorageContainerResponse resp = StorageContainerResponse.newBuilder()
             .setCode(StatusCode.STREAM_NOT_FOUND)
             .build();
         StorageContainerRequest request = createGetActiveRangesRequest(scId, 34L);
 
-        when(scStore.getActiveRanges(request))
+        when(mockRangeStoreService.getActiveRanges(request))
             .thenReturn(CompletableFuture.completedFuture(resp));
 
-        CompletableFuture<StorageContainerResponse> future =
-            rangeStore.getActiveRanges(request);
-        verify(scStore, times(1)).getActiveRanges(request);
+        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+            metaRangeService.getActiveRanges(request));
+        verify(mockRangeStoreService, times(1)).getActiveRanges(request);
         assertTrue(resp == future.get());
     }
 
@@ -477,8 +516,8 @@ public class TestRangeStoreImpl {
     public void testPutNoStorageContainer() throws Exception {
         rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
 
-        verifyNotFoundException(
-            rangeStore.put(createPutRequest(ROOT_STORAGE_CONTAINER_ID)),
+        verifyNotFoundException(fromListenableFuture(
+            tableService.put(createPutRequest(ROOT_STORAGE_CONTAINER_ID))),
             Status.NOT_FOUND);
     }
 
@@ -486,8 +525,8 @@ public class TestRangeStoreImpl {
     public void testDeleteNoStorageContainer() throws Exception {
         rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
 
-        verifyNotFoundException(
-            rangeStore.delete(createDeleteRequest(ROOT_STORAGE_CONTAINER_ID)),
+        verifyNotFoundException(fromListenableFuture(
+            tableService.delete(createDeleteRequest(ROOT_STORAGE_CONTAINER_ID))),
             Status.NOT_FOUND);
     }
 
@@ -495,59 +534,50 @@ public class TestRangeStoreImpl {
     public void testRangeNoStorageContainer() throws Exception {
         rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
 
-        verifyNotFoundException(
-            rangeStore.range(createRangeRequest(ROOT_STORAGE_CONTAINER_ID)),
+        verifyNotFoundException(fromListenableFuture(
+            tableService.range(createRangeRequest(ROOT_STORAGE_CONTAINER_ID))),
             Status.NOT_FOUND);
     }
 
     @Test
     public void testRangeMockStorageContainer() throws Exception {
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
         StorageContainerResponse response = createRangeResponse(StatusCode.SUCCESS);
         StorageContainerRequest request = createRangeRequest(ROOT_STORAGE_CONTAINER_ID);
 
-        when(scStore.range(request))
+        when(mockRangeStoreService.range(request))
             .thenReturn(CompletableFuture.completedFuture(response));
 
-        CompletableFuture<StorageContainerResponse> future =
-            rangeStore.range(request);
-        verify(scStore, times(1)).range(eq(request));
+        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+            tableService.range(request));
+        verify(mockRangeStoreService, times(1)).range(eq(request));
         assertTrue(response == future.get());
     }
 
     @Test
     public void testDeleteMockStorageContainer() throws Exception {
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
         StorageContainerResponse response = createDeleteResponse(StatusCode.SUCCESS);
         StorageContainerRequest request = createDeleteRequest(ROOT_STORAGE_CONTAINER_ID);
 
-        when(scStore.delete(request))
+        when(mockRangeStoreService.delete(request))
             .thenReturn(CompletableFuture.completedFuture(response));
 
-        CompletableFuture<StorageContainerResponse> future =
-            rangeStore.delete(request);
-        verify(scStore, times(1)).delete(eq(request));
+        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+            tableService.delete(request));
+        verify(mockRangeStoreService, times(1)).delete(eq(request));
         assertTrue(response == future.get());
     }
 
     @Test
     public void testPutMockStorageContainer() throws Exception {
-        StorageContainer scStore = mock(StorageContainer.class);
-        when(scStore.stop()).thenReturn(FutureUtils.value(null));
-        rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
         StorageContainerResponse response = createPutResponse(StatusCode.SUCCESS);
         StorageContainerRequest request = createPutRequest(ROOT_STORAGE_CONTAINER_ID);
 
-        when(scStore.put(request))
+        when(mockRangeStoreService.put(request))
             .thenReturn(CompletableFuture.completedFuture(response));
 
-        CompletableFuture<StorageContainerResponse> future =
-            rangeStore.put(request);
-        verify(scStore, times(1)).put(eq(request));
+        CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+            tableService.put(request));
+        verify(mockRangeStoreService, times(1)).put(eq(request));
         assertTrue(response == future.get());
     }
 
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
index 787453f..178ac19 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
@@ -32,7 +32,7 @@ import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.junit.Test;
 
 /**
@@ -48,7 +48,7 @@ public class TestGrpcMetaRangeService {
 
     @Test
     public void testGetActiveRangesSuccess() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -79,7 +79,7 @@ public class TestGrpcMetaRangeService {
 
     @Test
     public void testGetActiveRangesFailure() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -109,7 +109,7 @@ public class TestGrpcMetaRangeService {
 
     @Test
     public void testGetActiveRangesException() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java
index 7970793..8a6c7a9 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java
@@ -70,8 +70,8 @@ import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.apache.bookkeeper.stream.storage.exceptions.StorageException;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
 import org.junit.Test;
 
 /**
@@ -108,7 +108,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testCreateNamespaceSuccess() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         CreateNamespaceResponse createResp = CreateNamespaceResponse.newBuilder()
             .setCode(StatusCode.SUCCESS)
@@ -152,7 +152,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testCreateNamespaceFailure() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         CreateNamespaceResponse createResp = CreateNamespaceResponse.newBuilder()
             .setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -195,7 +195,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testDeleteNamespaceSuccess() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         DeleteNamespaceResponse deleteResp = DeleteNamespaceResponse.newBuilder()
             .setCode(StatusCode.SUCCESS)
@@ -237,7 +237,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testDeleteNamespaceFailure() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         DeleteNamespaceResponse deleteResp = DeleteNamespaceResponse.newBuilder()
             .setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -279,7 +279,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testGetNamespaceSuccess() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         GetNamespaceResponse getResp = GetNamespaceResponse.newBuilder()
             .setCode(StatusCode.SUCCESS)
@@ -322,7 +322,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testGetNamespaceFailure() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         GetNamespaceResponse getResp = GetNamespaceResponse.newBuilder()
             .setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -368,7 +368,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testCreateStreamSuccess() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         CreateStreamResponse createResp = CreateStreamResponse.newBuilder()
             .setCode(StatusCode.SUCCESS)
@@ -413,7 +413,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testCreateStreamFailure() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         CreateStreamResponse createResp = CreateStreamResponse.newBuilder()
             .setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -457,7 +457,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testDeleteStreamSuccess() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         DeleteStreamResponse deleteResp = DeleteStreamResponse.newBuilder()
             .setCode(StatusCode.SUCCESS)
@@ -500,7 +500,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testDeleteStreamFailure() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         DeleteStreamResponse deleteResp = DeleteStreamResponse.newBuilder()
             .setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -543,7 +543,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testGetStreamSuccess() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         GetStreamResponse getResp = GetStreamResponse.newBuilder()
             .setCode(StatusCode.SUCCESS)
@@ -588,7 +588,7 @@ public class TestGrpcRootRangeService {
 
     @Test
     public void testGetStreamFailure() throws Exception {
-        RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
         GetStreamResponse getResp = GetStreamResponse.newBuilder()
             .setCode(StatusCode.INTERNAL_SERVER_ERROR)
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
index ed97dc1..e943fb4 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
@@ -38,7 +38,7 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.junit.Test;
 
 /**
@@ -63,7 +63,7 @@ public class TestGrpcTableService {
 
     @Test
     public void testPutSuccess() throws Exception {
-        RangeStore rangeService = mock(RangeStore.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -95,7 +95,7 @@ public class TestGrpcTableService {
 
     @Test
     public void testPutFailure() throws Exception {
-        RangeStore rangeService = mock(RangeStore.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -126,7 +126,7 @@ public class TestGrpcTableService {
 
     @Test
     public void testPutException() throws Exception {
-        RangeStore rangeService = mock(RangeStore.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -153,7 +153,7 @@ public class TestGrpcTableService {
 
     @Test
     public void testRangeSuccess() throws Exception {
-        RangeStore rangeService = mock(RangeStore.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -184,7 +184,7 @@ public class TestGrpcTableService {
 
     @Test
     public void testRangeActiveRangesFailure() throws Exception {
-        RangeStore rangeService = mock(RangeStore.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -214,7 +214,7 @@ public class TestGrpcTableService {
 
     @Test
     public void testRangeActiveRangesException() throws Exception {
-        RangeStore rangeService = mock(RangeStore.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -240,7 +240,7 @@ public class TestGrpcTableService {
 
     @Test
     public void testDeleteSuccess() throws Exception {
-        RangeStore rangeService = mock(RangeStore.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -271,7 +271,7 @@ public class TestGrpcTableService {
 
     @Test
     public void testDeleteFailure() throws Exception {
-        RangeStore rangeService = mock(RangeStore.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
@@ -301,7 +301,7 @@ public class TestGrpcTableService {
 
     @Test
     public void testDeleteException() throws Exception {
-        RangeStore rangeService = mock(RangeStore.class);
+        RangeStoreService rangeService = mock(RangeStoreService.class);
         GrpcTableService grpcService = new GrpcTableService(rangeService);
 
         StorageContainerRequest request = StorageContainerRequest
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestDefaultStorageContainerFactory.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestDefaultStorageContainerFactory.java
index ed87fa6..a56623b 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestDefaultStorageContainerFactory.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestDefaultStorageContainerFactory.java
@@ -16,23 +16,14 @@ package org.apache.bookkeeper.stream.storage.impl.sc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.util.concurrent.ListenableScheduledFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import java.net.URI;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
-import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 /**
  * Unit test for {@link DefaultStorageContainerFactory}.
@@ -40,23 +31,15 @@ import org.mockito.Mockito;
 public class TestDefaultStorageContainerFactory {
 
     @Test
-    public void testCreate() throws Exception {
-        OrderedScheduler scheduler = mock(OrderedScheduler.class);
-        OrderedScheduler snapshotScheduler = mock(OrderedScheduler.class);
-        MVCCStoreFactory storeFactory = mock(MVCCStoreFactory.class);
-        ListeningScheduledExecutorService snapshotExecutor = mock(ListeningScheduledExecutorService.class);
-        when(snapshotScheduler.chooseThread(anyLong())).thenReturn(snapshotExecutor);
-        Mockito.doReturn(mock(ListenableScheduledFuture.class))
-            .when(snapshotExecutor).scheduleWithFixedDelay(
-            any(Runnable.class), anyInt(), anyInt(), any(TimeUnit.class));
+    public void testCreate() {
+        StorageContainerServiceFactory mockServiceFactory =
+            mock(StorageContainerServiceFactory.class);
+        StorageContainerService mockService = mock(StorageContainerService.class);
 
+        when(mockServiceFactory.createStorageContainerService(anyLong()))
+            .thenReturn(mockService);
 
-        DefaultStorageContainerFactory factory = new DefaultStorageContainerFactory(
-            new StorageConfiguration(new CompositeConfiguration()),
-            (streamId, rangeId) -> streamId,
-            scheduler,
-            storeFactory,
-            URI.create("distributedlog://127.0.0.1/stream/storage"));
+        DefaultStorageContainerFactory factory = new DefaultStorageContainerFactory(mockServiceFactory);
         StorageContainer sc = factory.createStorageContainer(1234L);
         assertTrue(sc instanceof StorageContainerImpl);
         assertEquals(1234L, sc.getId());
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
index 7031629..837dbe2 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
@@ -11,21 +11,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.bookkeeper.stream.storage.impl.sc;
 
 import static org.junit.Assert.assertEquals;
@@ -81,7 +66,7 @@ public class TestStorageContainerRegistryImpl {
     @Test
     public void testOperationsAfterClosed() throws Exception {
         StorageContainerFactory scFactory = createStorageContainerFactory();
-        StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory, scheduler);
+        StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory);
         registry.close();
 
         long scId = 1234L;
@@ -106,7 +91,7 @@ public class TestStorageContainerRegistryImpl {
     @Test
     public void testStopNotFoundStorageContainer() throws Exception {
         StorageContainerFactory scFactory = createStorageContainerFactory();
-        StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory, scheduler);
+        StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory);
         FutureUtils.result(registry.stopStorageContainer(1234L));
         assertEquals(0, registry.getNumStorageContainers());
     }
@@ -114,7 +99,7 @@ public class TestStorageContainerRegistryImpl {
     @Test
     public void testStartStorageContainerTwice() throws Exception {
         StorageContainerFactory scFactory = createStorageContainerFactory();
-        StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory, scheduler);
+        StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory);
         FutureUtils.result(registry.startStorageContainer(1234L));
         assertEquals(1, registry.getNumStorageContainers());
         // second time
@@ -140,7 +125,7 @@ public class TestStorageContainerRegistryImpl {
 
         long scId = 1L;
 
-        StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(factory, scheduler);
+        StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(factory);
         FutureUtils.result(registry.startStorageContainer(scId));
         assertEquals(1, registry.getNumStorageContainers());
         assertEquals(sc1, registry.getStorageContainer(scId));
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
index 193503b..3a5a4fd 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
@@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.testing.MoreAsserts;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
 import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
@@ -76,7 +75,6 @@ public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
     private StorageContainerRegistry scRegistry;
     private ZkClusterMetadataStore clusterMetadataStore;
     private ZkStorageContainerManager scManager;
-    private OrderedScheduler scheduler;
 
     @Before
     public void setup() {
@@ -89,13 +87,8 @@ public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
             curatorClient, zkServers, "/" + runtime.getMethodName()));
         clusterMetadataStore.initializeCluster(NUM_STORAGE_CONTAINERS);
 
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("test-scheduler")
-            .numThreads(1)
-            .build();
-
         mockScFactory = mock(StorageContainerFactory.class);
-        scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory, scheduler));
+        scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory));
 
         scManager = new ZkStorageContainerManager(
             myEndpoint,
@@ -112,10 +105,6 @@ public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
             scManager.close();
         }
 
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
-
         if (null != curatorClient) {
             curatorClient.close();
         }
@@ -278,7 +267,7 @@ public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
                 );
             }
         };
-        scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory, scheduler));
+        scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory));
 
         scManager = new ZkStorageContainerManager(
             myEndpoint,
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
similarity index 98%
rename from stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImplTest.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
index e2a582b..a4d8710 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.storage.impl.sc;
+package org.apache.bookkeeper.stream.storage.impl.service;
 
 import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
 import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
@@ -73,9 +73,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Unit test of {@link StorageContainerImpl}.
+ * Unit test of {@link RangeStoreServiceImpl}.
  */
-public class StorageContainerImplTest {
+public class RangeStoreServiceImplTest {
 
     private static final long SCID = 3456L;
     private static final long STREAM_ID = 1234L;
@@ -86,7 +86,7 @@ public class StorageContainerImplTest {
     private RootRangeStoreFactory rrStoreFactory;
     private MetaRangeStoreFactory mrStoreFactory;
     private TableStoreFactory tableStoreFactory;
-    private StorageContainerImpl container;
+    private RangeStoreServiceImpl container;
     private OrderedScheduler scheduler;
     private RootRangeStore rrStore;
     private MVCCAsyncStore<byte[], byte[]> rrMvccStore;
@@ -111,7 +111,7 @@ public class StorageContainerImplTest {
         this.mrMvccStore = mock(MVCCAsyncStore.class);
         this.trMvccStore = mock(MVCCAsyncStore.class);
 
-        this.container = new StorageContainerImpl(
+        this.container = new RangeStoreServiceImpl(
             SCID,
             scheduler,
             mvccStoreFactory,
@@ -179,7 +179,7 @@ public class StorageContainerImplTest {
     public void testStartRootContainer() throws Exception {
         mockStorageContainer(ROOT_STORAGE_CONTAINER_ID);
 
-        StorageContainerImpl container = new StorageContainerImpl(
+        RangeStoreServiceImpl container = new RangeStoreServiceImpl(
             ROOT_STORAGE_CONTAINER_ID,
             scheduler,
             mvccStoreFactory,

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.