You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 22:03:59 UTC
[bookkeeper] 05/09: [TABLE SERVICE] use grpc reverse proxy to serve
storage container requests
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 8e4c13b5e12d94a8a6ec921d6dc28a2b0ca4a0ca
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon May 28 18:29:09 2018 -0700
[TABLE SERVICE] use grpc reverse proxy to serve storage container requests
Descriptions of the changes in this PR:
*Motivation*
*Changes*
**Client Changes**
Changed `StorageContainerChannel` to add client interceptor to stamp storage container id into the the requests' metadata.
**Server Changes**
- moved the stream storage logic out of storage containers to a `service` package. the main logic will be kept in `RangeStoreService` and `RangeStoreServiceFactory`.
- make the storage container logic for hosting `StorageContainerService`.
- Each storage container will run an inprogress grpc server for serving the grpc services registered to the container and provide an `channel` for accessing those grpc service.
- Changed the server to use reverse proxy for serving/proxying storage container requests.
*NOTE*
This change doesn't directly remove `StorageContainerRequest` and `StorageContainerResponse`. It would be done in a subsequent change.
Master Issue: #1205
Author: Sijie Guo <si...@apache.org>
Reviewers: Jia Zhai <None>
This closes #1448 from sijie/storage_container_channel
---
.../StorageContainerClientInterceptor.java | 4 +-
.../container/TestStorageContainerChannel.java | 13 +-
.../bookkeeper/common/util/ListenableFutures.java | 10 +
.../stream/protocol/ProtocolConstants.java | 3 +
.../bookkeeper/stream/server/StorageServer.java | 6 +-
.../bookkeeper/stream/server/grpc/GrpcServer.java | 28 +-
.../stream/server/grpc/GrpcServerSpec.java | 4 +-
.../server/grpc/GrpcStorageContainerService.java | 10 +-
.../stream/server/service/StorageService.java | 16 +-
.../stream/server/grpc/TestGrpcServer.java | 6 +-
...{RangeStore.java => StorageContainerStore.java} | 15 +-
.../storage/api/metadata/RangeStoreService.java | 6 +
.../stream/storage/api/sc/StorageContainer.java | 12 +-
...Container.java => StorageContainerService.java} | 36 ++-
.../api/sc/StorageContainerServiceFactory.java | 39 +++
.../api/service/RangeStoreServiceFactory.java | 41 +++
.../stream/storage/api/service/package-info.java | 22 ++
stream/storage/impl/pom.xml | 5 +
...lder.java => StorageContainerStoreBuilder.java} | 56 ++--
.../stream/storage/impl/RangeStoreImpl.java | 206 -------------
.../storage/impl/StorageContainerStoreImpl.java | 113 +++++++
.../storage/impl/grpc/GrpcMetaRangeService.java | 2 +-
.../stream/storage/impl/grpc/GrpcServices.java | 46 +++
.../stream/storage/impl/sc/Channel404.java | 80 +++++
.../impl/sc/DefaultStorageContainerFactory.java | 31 +-
.../storage/impl/sc/StorageContainer404.java | 63 ++++
.../storage/impl/sc/StorageContainerImpl.java | 336 ++++-----------------
.../impl/sc/StorageContainerRegistryImpl.java | 8 +-
.../FailRequestRangeStoreService.java} | 43 +--
.../RangeStoreContainerServiceFactoryImpl.java | 46 +++
.../service/RangeStoreContainerServiceImpl.java | 55 ++++
.../impl/service/RangeStoreServiceFactoryImpl.java | 71 +++++
.../RangeStoreServiceImpl.java} | 60 ++--
.../stream/storage/impl/service/package-info.java | 22 ++
....java => TestStorageContainerStoreBuilder.java} | 22 +-
...mpl.java => TestStorageContainerStoreImpl.java} | 244 ++++++++-------
.../impl/grpc/TestGrpcMetaRangeService.java | 8 +-
.../impl/grpc/TestGrpcRootRangeService.java | 26 +-
.../storage/impl/grpc/TestGrpcTableService.java | 20 +-
.../sc/TestDefaultStorageContainerFactory.java | 35 +--
.../impl/sc/TestStorageContainerRegistryImpl.java | 23 +-
.../impl/sc/ZkStorageContainerManagerTest.java | 15 +-
.../RangeStoreServiceImplTest.java} | 12 +-
43 files changed, 1034 insertions(+), 885 deletions(-)
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
index 284431e..1e1de08 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
@@ -18,6 +18,8 @@
package org.apache.bookkeeper.clients.impl.container;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY;
+
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@@ -32,8 +34,6 @@ import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
*/
public class StorageContainerClientInterceptor implements ClientInterceptor {
- private static final String SC_ID_KEY = "SC_ID";
-
private final long scId;
private final Metadata.Key<Long> scIdKey;
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/container/TestStorageContainerChannel.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/container/TestStorageContainerChannel.java
index 9d3869b..5623335 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/container/TestStorageContainerChannel.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/container/TestStorageContainerChannel.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -58,9 +59,9 @@ public class TestStorageContainerChannel extends GrpcClientTestBase {
private OrderedScheduler scheduler;
private final LocationClient locationClient = mock(LocationClient.class);
- private StorageServerChannel mockChannel = mock(StorageServerChannel.class);
- private StorageServerChannel mockChannel2 = mock(StorageServerChannel.class);
- private StorageServerChannel mockChannel3 = mock(StorageServerChannel.class);
+ private StorageServerChannel mockChannel = newMockServerChannel();
+ private StorageServerChannel mockChannel2 = newMockServerChannel();
+ private StorageServerChannel mockChannel3 = newMockServerChannel();
private final Endpoint endpoint = Endpoint.newBuilder()
.setHostname("127.0.0.1")
.setPort(8181)
@@ -106,6 +107,12 @@ public class TestStorageContainerChannel extends GrpcClientTestBase {
}
}
+ private StorageServerChannel newMockServerChannel() {
+ StorageServerChannel channel = mock(StorageServerChannel.class);
+ when(channel.intercept(anyLong())).thenReturn(channel);
+ return channel;
+ }
+
private void ensureCallbackExecuted() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
scheduler.submit(() -> latch.countDown());
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/util/ListenableFutures.java b/stream/common/src/main/java/org/apache/bookkeeper/common/util/ListenableFutures.java
index 586a457..29b5fa6 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/util/ListenableFutures.java
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/util/ListenableFutures.java
@@ -36,6 +36,16 @@ import lombok.NoArgsConstructor;
public final class ListenableFutures {
/**
+ * Convert a {@link ListenableFuture} to a {@link CompletableFuture}.
+ *
+ * @param listenableFuture listenable future to convert.
+ * @return the completable future.
+ */
+ public static <T> CompletableFuture<T> fromListenableFuture(ListenableFuture<T> listenableFuture) {
+ return fromListenableFuture(listenableFuture, Function.identity());
+ }
+
+ /**
* Convert a {@link ListenableFuture} to a {@link CompletableFuture} and do a transformation.
*
* @param listenableFuture listenable future
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 404ec56..0e6900c 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -18,6 +18,7 @@
package org.apache.bookkeeper.stream.protocol;
+import io.grpc.Metadata;
import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy;
import org.apache.bookkeeper.stream.proto.RangeKeyType;
import org.apache.bookkeeper.stream.proto.RetentionPolicy;
@@ -107,4 +108,6 @@ public final class ProtocolConstants {
.setSplitPolicy(DEFAULT_SPLIT_POLICY)
.build();
+ // storage container request metadata key
+ public static final String SC_ID_KEY = "sc-id" + Metadata.BINARY_HEADER_SUFFIX;
}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 94c8587..e0774de 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -48,7 +48,7 @@ import org.apache.bookkeeper.stream.server.service.RegistrationServiceProvider;
import org.apache.bookkeeper.stream.server.service.RegistrationStateService;
import org.apache.bookkeeper.stream.server.service.StatsProviderService;
import org.apache.bookkeeper.stream.server.service.StorageService;
-import org.apache.bookkeeper.stream.storage.RangeStoreBuilder;
+import org.apache.bookkeeper.stream.storage.StorageContainerStoreBuilder;
import org.apache.bookkeeper.stream.storage.StorageResources;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
@@ -243,7 +243,7 @@ public class StorageServer {
rootStatsLogger.scope("dlog"));
// Create range (stream) store
- RangeStoreBuilder rangeStoreBuilder = RangeStoreBuilder.newBuilder()
+ StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder()
.withStatsLogger(rootStatsLogger.scope("storage"))
.withStorageConfiguration(storageConf)
// the storage resources shared across multiple components
@@ -281,7 +281,7 @@ public class StorageServer {
storageResources,
storageConf.getServeReadOnlyTables()));
StorageService storageService = new StorageService(
- storageConf, rangeStoreBuilder, rootStatsLogger.scope("storage"));
+ storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage"));
// Create gRPC server
StatsLogger rpcStatsLogger = rootStatsLogger.scope("grpc");
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
index ce95ccf..7a5f163 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
@@ -18,18 +18,18 @@ import com.google.common.annotations.VisibleForTesting;
import io.grpc.HandlerRegistry;
import io.grpc.Server;
import io.grpc.ServerBuilder;
+import io.grpc.ServerServiceDefinition;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.grpc.proxy.ProxyHandlerRegistry;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
import org.apache.bookkeeper.stream.server.exceptions.StorageServerRuntimeException;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
-import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcMetaRangeService;
-import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcRootRangeService;
-import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcTableService;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
+import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcServices;
/**
* KeyRange Server.
@@ -50,15 +50,15 @@ public class GrpcServer extends AbstractLifecycleComponent<StorageServerConfigur
private final Endpoint myEndpoint;
private final Server grpcServer;
- public GrpcServer(RangeStore rangeStore,
+ public GrpcServer(StorageContainerStore storageContainerStore,
StorageServerConfiguration conf,
Endpoint myEndpoint,
StatsLogger statsLogger) {
- this(rangeStore, conf, myEndpoint, null, null, statsLogger);
+ this(storageContainerStore, conf, myEndpoint, null, null, statsLogger);
}
@VisibleForTesting
- public GrpcServer(RangeStore rangeStore,
+ public GrpcServer(StorageContainerStore storageContainerStore,
StorageServerConfiguration conf,
Endpoint myEndpoint,
String localServerName,
@@ -75,12 +75,14 @@ public class GrpcServer extends AbstractLifecycleComponent<StorageServerConfigur
}
this.grpcServer = serverBuilder.build();
} else {
- this.grpcServer = ServerBuilder
- .forPort(this.myEndpoint.getPort())
- .addService(new GrpcRootRangeService(rangeStore))
- .addService(new GrpcStorageContainerService(rangeStore))
- .addService(new GrpcMetaRangeService(rangeStore))
- .addService(new GrpcTableService(rangeStore))
+ ProxyHandlerRegistry.Builder proxyRegistryBuilder = ProxyHandlerRegistry.newBuilder()
+ .setChannelFinder(storageContainerStore);
+ for (ServerServiceDefinition definition : GrpcServices.create(null)) {
+ proxyRegistryBuilder = proxyRegistryBuilder.addService(definition);
+ }
+ this.grpcServer = ServerBuilder.forPort(this.myEndpoint.getPort())
+ .addService(new GrpcStorageContainerService(storageContainerStore))
+ .fallbackHandlerRegistry(proxyRegistryBuilder.build())
.build();
}
}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServerSpec.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServerSpec.java
index 5b77b02..ac177e5 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServerSpec.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServerSpec.java
@@ -23,7 +23,7 @@ import lombok.experimental.Accessors;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
/**
* Spec for building a grpc server.
@@ -39,7 +39,7 @@ public class GrpcServerSpec {
*
* @return store supplier for building grpc server.
*/
- Supplier<RangeStore> storeSupplier;
+ Supplier<StorageContainerStore> storeSupplier;
/**
* Get the storage server configuration.
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcStorageContainerService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcStorageContainerService.java
index e750bfc..8b6a26a 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcStorageContainerService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcStorageContainerService.java
@@ -23,7 +23,7 @@ import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointRes
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceImplBase;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
/**
* Grpc based storage container service.
@@ -31,10 +31,10 @@ import org.apache.bookkeeper.stream.storage.api.RangeStore;
@Slf4j
class GrpcStorageContainerService extends StorageContainerServiceImplBase {
- private final RangeStore rangeStore;
+ private final StorageContainerStore storageContainerStore;
- GrpcStorageContainerService(RangeStore rangeStore) {
- this.rangeStore = rangeStore;
+ GrpcStorageContainerService(StorageContainerStore storageContainerStore) {
+ this.storageContainerStore = storageContainerStore;
}
@Override
@@ -43,7 +43,7 @@ class GrpcStorageContainerService extends StorageContainerServiceImplBase {
GetStorageContainerEndpointResponse.Builder responseBuilder = GetStorageContainerEndpointResponse.newBuilder()
.setStatusCode(StatusCode.SUCCESS);
for (int i = 0; i < request.getRequestsCount(); i++) {
- Endpoint endpoint = rangeStore
+ Endpoint endpoint = storageContainerStore
.getRoutingService()
.getStorageContainer(request.getRequests(i).getStorageContainer());
OneStorageContainerEndpointResponse.Builder oneRespBuilder;
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/StorageService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/StorageService.java
index bddd6c1..2ac1804 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/StorageService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/StorageService.java
@@ -19,23 +19,23 @@ import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stream.storage.RangeStoreBuilder;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.StorageContainerStoreBuilder;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
/**
- * Service to run the storage {@link RangeStore}.
+ * Service to run the storage {@link StorageContainerStore}.
*/
@Slf4j
public class StorageService
extends AbstractLifecycleComponent<StorageConfiguration>
- implements Supplier<RangeStore> {
+ implements Supplier<StorageContainerStore> {
- private final RangeStoreBuilder storeBuilder;
- private RangeStore store;
+ private final StorageContainerStoreBuilder storeBuilder;
+ private StorageContainerStore store;
public StorageService(StorageConfiguration conf,
- RangeStoreBuilder storeBuilder,
+ StorageContainerStoreBuilder storeBuilder,
StatsLogger statsLogger) {
super("storage-service", conf, statsLogger);
this.storeBuilder = storeBuilder;
@@ -58,7 +58,7 @@ public class StorageService
}
@Override
- public RangeStore get() {
+ public StorageContainerStore get() {
return store;
}
}
diff --git a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcServer.java b/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcServer.java
index b05640a..4cfbea1 100644
--- a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcServer.java
+++ b/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcServer.java
@@ -22,7 +22,7 @@ import io.grpc.util.MutableHandlerRegistry;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.server.StorageServer;
import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.impl.StorageContainerStoreImpl;
import org.apache.commons.configuration.CompositeConfiguration;
import org.junit.Rule;
import org.junit.Test;
@@ -41,7 +41,7 @@ public class TestGrpcServer {
@Test
public void testCreateLocalServer() {
GrpcServer server = new GrpcServer(
- mock(RangeStoreImpl.class),
+ mock(StorageContainerStoreImpl.class),
StorageServerConfiguration.of(compConf),
null,
name.getMethodName(),
@@ -55,7 +55,7 @@ public class TestGrpcServer {
@Test
public void testCreateBindServer() throws Exception {
GrpcServer server = new GrpcServer(
- mock(RangeStoreImpl.class),
+ mock(StorageContainerStoreImpl.class),
StorageServerConfiguration.of(compConf),
StorageServer.createLocalEndpoint(0, false),
null,
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/RangeStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/StorageContainerStore.java
similarity index 68%
rename from stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/RangeStore.java
rename to stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/StorageContainerStore.java
index 164275b..61ff588 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/RangeStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/StorageContainerStore.java
@@ -14,15 +14,15 @@
package org.apache.bookkeeper.stream.storage.api;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.common.component.LifecycleComponent;
-import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.common.grpc.proxy.ChannelFinder;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
/**
- * The umbrella interface for accessing ranges (both metadata and data).
+ * The umbrella interface for accessing storage containers.
*/
-public interface RangeStore extends LifecycleComponent, RangeStoreService {
+public interface StorageContainerStore extends LifecycleComponent, ChannelFinder {
/**
* Get the routing service.
@@ -32,11 +32,10 @@ public interface RangeStore extends LifecycleComponent, RangeStoreService {
StorageContainerRoutingService getRoutingService();
/**
- * Choose the executor for a given {@code key}.
+ * Get the container registry.
*
- * @param key submit key
- * @return executor
+ * @return container registry.
*/
- ScheduledExecutorService chooseExecutor(long key);
+ StorageContainerRegistry getRegistry();
}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/RangeStoreService.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/RangeStoreService.java
index 94e63d3..2aeb317 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/RangeStoreService.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/RangeStoreService.java
@@ -13,6 +13,7 @@
*/
package org.apache.bookkeeper.stream.storage.api.metadata;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
/**
@@ -22,4 +23,9 @@ public interface RangeStoreService
extends MetaRangeStore,
RootRangeStore,
TableStore {
+
+ CompletableFuture<Void> start();
+
+ CompletableFuture<Void> stop();
+
}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
index 47855b0..83c5dcd 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
@@ -18,16 +18,15 @@
package org.apache.bookkeeper.stream.storage.api.sc;
+import io.grpc.Channel;
import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
/**
* A {@code StorageContainer} is a service container that can encapsulate metadata and data operations.
*
* <p>A {@link StorageContainer} is typically implemented by replicated state machine backed by a log.
*/
-public interface StorageContainer
- extends AutoCloseable, RangeStoreService {
+public interface StorageContainer extends AutoCloseable {
/**
* Get the storage container id.
@@ -37,6 +36,13 @@ public interface StorageContainer
long getId();
/**
+ * Get the grpc channel to interact with grpc services registered in this container.
+ *
+ * @return grpc channel.
+ */
+ Channel getChannel();
+
+ /**
* Start the storage container.
*
* @return a future represents the result of starting a storage container.
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerService.java
similarity index 57%
copy from stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
copy to stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerService.java
index 47855b0..7014a73 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerService.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,41 +18,39 @@
package org.apache.bookkeeper.stream.storage.api.sc;
+import io.grpc.ServerServiceDefinition;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
/**
- * A {@code StorageContainer} is a service container that can encapsulate metadata and data operations.
+ * Represents the service running within a container.
*
- * <p>A {@link StorageContainer} is typically implemented by replicated state machine backed by a log.
+ * <p>The implementation implements their core business logic and the framework provides resources like tables/streams
+ * for it.
*/
-public interface StorageContainer
- extends AutoCloseable, RangeStoreService {
+public interface StorageContainerService {
/**
- * Get the storage container id.
+ * Return the registered grpc services.
*
- * @return the storage container id.
+ * <p>The framework registers the grpc services in the container runtime. So the framework will know how to route
+ * the grpc requests to the actual grpc services.
+ * @return
*/
- long getId();
+ Collection<ServerServiceDefinition> getRegisteredServices();
/**
- * Start the storage container.
+ * Start the storage container service.
*
- * @return a future represents the result of starting a storage container.
+ * @return a future represents the result of starting a storage container service.
*/
- CompletableFuture<StorageContainer> start();
+ CompletableFuture<Void> start();
/**
- * Stop the storage container.
+ * Stop the storage container service.
*
- * @return a future represents the result of stopping a storage container.
+ * @return a future represents the result of stopping a storage container service.
*/
CompletableFuture<Void> stop();
- /**
- * Close a storage container.
- */
- void close();
-
}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerServiceFactory.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerServiceFactory.java
new file mode 100644
index 0000000..e15f7d7
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerServiceFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.api.sc;
+
+/**
+ * Factory to create storage container service.
+ */
+public interface StorageContainerServiceFactory extends AutoCloseable {
+
+ /**
+ * Create a storage container service with container id <tt>scId</tt>.
+ *
+ * @param scId storage container id
+ * @return storage container id
+ */
+ StorageContainerService createStorageContainerService(long scId);
+
+ /**
+ * {@inheritDoc}
+ */
+ void close();
+
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/RangeStoreServiceFactory.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/RangeStoreServiceFactory.java
new file mode 100644
index 0000000..075fd3e
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/RangeStoreServiceFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.api.service;
+
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+
+/**
+ * Factory to create range store services.
+ */
+public interface RangeStoreServiceFactory extends AutoCloseable {
+
+ /**
+ * Create a range store service that will be launched at storage container <tt>scId</tt>.
+ *
+ * @param scId storage container to run range store service.
+ * @return range store service
+ */
+ RangeStoreService createService(long scId);
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ void close();
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/package-info.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/package-info.java
new file mode 100644
index 0000000..bb88ca2
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/service/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes that provide stream storage service logic.
+ */
+package org.apache.bookkeeper.stream.storage.api.service;
\ No newline at end of file
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index b178fc7..4c656f4 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -60,6 +60,11 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper.tests</groupId>
+ <artifactId>stream-storage-tests-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
similarity index 72%
rename from stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index c03b374..9c96139 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -20,20 +20,22 @@ import java.net.URI;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.impl.StorageContainerStoreImpl;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
+import org.apache.bookkeeper.stream.storage.impl.service.RangeStoreContainerServiceFactoryImpl;
+import org.apache.bookkeeper.stream.storage.impl.service.RangeStoreServiceFactoryImpl;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
/**
* Builder to build the storage component.
*/
-public final class RangeStoreBuilder {
+public final class StorageContainerStoreBuilder {
- public static RangeStoreBuilder newBuilder() {
- return new RangeStoreBuilder();
+ public static StorageContainerStoreBuilder newBuilder() {
+ return new StorageContainerStoreBuilder();
}
private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
@@ -45,18 +47,7 @@ public final class RangeStoreBuilder {
private MVCCStoreFactory mvccStoreFactory = null;
private URI defaultBackendUri = null;
- private RangeStoreBuilder() {
- }
-
- /**
- * Build the range store with the provided {@code numStorageContainers}.
- *
- * @param numStorageContainers number of the storage containers.
- * @return range store builder
- */
- public RangeStoreBuilder withNumStorageContainers(int numStorageContainers) {
- this.placementPolicyFactory = () -> StorageContainerPlacementPolicyImpl.of(numStorageContainers);
- return this;
+ private StorageContainerStoreBuilder() {
}
/**
@@ -65,7 +56,7 @@ public final class RangeStoreBuilder {
* @param placementPolicyFactory placement policy factor to create placement policies.
* @return range store builder.
*/
- public RangeStoreBuilder withStorageContainerPlacementPolicyFactory(
+ public StorageContainerStoreBuilder withStorageContainerPlacementPolicyFactory(
StorageContainerPlacementPolicy.Factory placementPolicyFactory) {
this.placementPolicyFactory = placementPolicyFactory;
return this;
@@ -77,7 +68,7 @@ public final class RangeStoreBuilder {
* @param statsLogger stats logger for collecting stats.
* @return range store builder;
*/
- public RangeStoreBuilder withStatsLogger(StatsLogger statsLogger) {
+ public StorageContainerStoreBuilder withStatsLogger(StatsLogger statsLogger) {
if (null == statsLogger) {
return this;
}
@@ -91,7 +82,7 @@ public final class RangeStoreBuilder {
* @param storeConf storage configuration
* @return range store builder
*/
- public RangeStoreBuilder withStorageConfiguration(StorageConfiguration storeConf) {
+ public StorageContainerStoreBuilder withStorageConfiguration(StorageConfiguration storeConf) {
this.storeConf = storeConf;
return this;
}
@@ -102,7 +93,7 @@ public final class RangeStoreBuilder {
* @param scmFactory storage container manager factory.
* @return range store builder
*/
- public RangeStoreBuilder withStorageContainerManagerFactory(StorageContainerManagerFactory scmFactory) {
+ public StorageContainerStoreBuilder withStorageContainerManagerFactory(StorageContainerManagerFactory scmFactory) {
this.scmFactory = scmFactory;
return this;
}
@@ -113,7 +104,7 @@ public final class RangeStoreBuilder {
* @param resources storage resources.
* @return range store builder.
*/
- public RangeStoreBuilder withStorageResources(StorageResources resources) {
+ public StorageContainerStoreBuilder withStorageResources(StorageResources resources) {
this.storeResources = resources;
return this;
}
@@ -124,7 +115,7 @@ public final class RangeStoreBuilder {
* @param storeFactory factory to create range stores.
* @return range store builder.
*/
- public RangeStoreBuilder withRangeStoreFactory(MVCCStoreFactory storeFactory) {
+ public StorageContainerStoreBuilder withRangeStoreFactory(MVCCStoreFactory storeFactory) {
this.mvccStoreFactory = storeFactory;
return this;
}
@@ -135,25 +126,32 @@ public final class RangeStoreBuilder {
* @param uri uri for storing table ranges.
* @return range store builder.
*/
- public RangeStoreBuilder withDefaultBackendUri(URI uri) {
+ public StorageContainerStoreBuilder withDefaultBackendUri(URI uri) {
this.defaultBackendUri = uri;
return this;
}
- public RangeStore build() {
+ public StorageContainerStore build() {
checkNotNull(scmFactory, "StorageContainerManagerFactory is not provided");
checkNotNull(storeConf, "StorageConfiguration is not provided");
checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
checkNotNull(defaultBackendUri, "Default backend uri is not provided");
checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided");
- return new RangeStoreImpl(
+ RangeStoreServiceFactoryImpl serviceFactory = new RangeStoreServiceFactoryImpl(
storeConf,
+ placementPolicyFactory.newPlacementPolicy(),
storeResources.scheduler(),
- scmFactory,
mvccStoreFactory,
- defaultBackendUri,
- placementPolicyFactory,
+ defaultBackendUri);
+
+ RangeStoreContainerServiceFactoryImpl containerServiceFactory =
+ new RangeStoreContainerServiceFactoryImpl(serviceFactory);
+
+ return new StorageContainerStoreImpl(
+ storeConf,
+ scmFactory,
+ containerServiceFactory,
statsLogger);
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
deleted file mode 100644
index f175a55..0000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl;
-
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.common.util.SharedResourceManager;
-import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
-import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
-import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
-
-/**
- * KeyRange Service.
- */
-public class RangeStoreImpl
- extends AbstractLifecycleComponent<StorageConfiguration>
- implements RangeStore {
-
- private final Resource<OrderedScheduler> schedulerResource;
- private final OrderedScheduler scheduler;
- private final StorageContainerManagerFactory scmFactory;
- private final StorageContainerRegistryImpl scRegistry;
- private final StorageContainerManager scManager;
- private final MVCCStoreFactory storeFactory;
-
- public RangeStoreImpl(StorageConfiguration conf,
- Resource<OrderedScheduler> schedulerResource,
- StorageContainerManagerFactory factory,
- MVCCStoreFactory mvccStoreFactory,
- URI defaultBackendUri,
- StorageContainerPlacementPolicy.Factory placementPolicyFactory,
- StatsLogger statsLogger) {
- super("range-service", conf, statsLogger);
- this.schedulerResource = schedulerResource;
- this.scheduler = SharedResourceManager.shared().get(schedulerResource);
- this.scmFactory = factory;
- this.storeFactory = mvccStoreFactory;
- StorageContainerPlacementPolicy placementPolicy = placementPolicyFactory.newPlacementPolicy();
- this.scRegistry = new StorageContainerRegistryImpl(
- new DefaultStorageContainerFactory(
- conf,
- placementPolicy,
- scheduler,
- storeFactory,
- defaultBackendUri),
- scheduler);
- this.scManager = scmFactory.create(conf, scRegistry);
- }
-
- @Override
- public ScheduledExecutorService chooseExecutor(long key) {
- return this.scheduler.chooseThread(key);
- }
-
- @VisibleForTesting
- StorageContainerRegistryImpl getRegistry() {
- return this.scRegistry;
- }
-
- @Override
- public StorageContainerRoutingService getRoutingService() {
- return this.scManager;
- }
-
- //
- // Lifecycle management
- //
-
- @Override
- protected void doStart() {
- this.scManager.start();
- }
-
- @Override
- protected void doStop() {
- this.scManager.stop();
- this.scRegistry.close();
- }
-
- @Override
- protected void doClose() throws IOException {
- this.scManager.close();
- // stop the core scheduler
- SharedResourceManager.shared().release(
- schedulerResource,
- scheduler);
- }
-
- private StorageContainer getStorageContainer(long scId) {
- return scRegistry.getStorageContainer(scId);
- }
-
- //
- // Root Range Service
- //
-
- @Override
- public CompletableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest request) {
- return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).createNamespace(request);
- }
-
- @Override
- public CompletableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest request) {
- return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).deleteNamespace(request);
- }
-
- @Override
- public CompletableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest request) {
- return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).getNamespace(request);
- }
-
- @Override
- public CompletableFuture<CreateStreamResponse> createStream(CreateStreamRequest request) {
- return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).createStream(request);
- }
-
- @Override
- public CompletableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest request) {
- return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).deleteStream(request);
- }
-
- @Override
- public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
- return getStorageContainer(ROOT_STORAGE_CONTAINER_ID).getStream(request);
- }
-
- //
- // Stream Meta Range Service
- //
-
- @Override
- public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
- return getStorageContainer(request.getScId()).getActiveRanges(request);
- }
-
- //
- // Table Service
- //
-
- @Override
- public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
- return getStorageContainer(request.getScId()).range(request);
- }
-
- @Override
- public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
- return getStorageContainer(request.getScId()).put(request);
- }
-
- @Override
- public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
- return getStorageContainer(request.getScId()).delete(request);
- }
-
- @Override
- public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
- return getStorageContainer(request.getScId()).txn(request);
- }
-
- @Override
- public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
- return getStorageContainer(request.getScId()).incr(request);
- }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
new file mode 100644
index 0000000..56fcba2
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl;
+
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY;
+
+import io.grpc.Channel;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import java.io.IOException;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
+
+/**
+ * KeyRange Service.
+ */
+public class StorageContainerStoreImpl
+ extends AbstractLifecycleComponent<StorageConfiguration>
+ implements StorageContainerStore {
+
+ private final StorageContainerManagerFactory scmFactory;
+ private final StorageContainerRegistryImpl scRegistry;
+ private final StorageContainerManager scManager;
+ private final StorageContainerServiceFactory serviceFactory;
+ private final Metadata.Key<Long> scIdKey;
+
+ public StorageContainerStoreImpl(StorageConfiguration conf,
+ StorageContainerManagerFactory managerFactory,
+ StorageContainerServiceFactory serviceFactory,
+ StatsLogger statsLogger) {
+ super("range-service", conf, statsLogger);
+ this.scmFactory = managerFactory;
+ this.scRegistry = new StorageContainerRegistryImpl(
+ new DefaultStorageContainerFactory(serviceFactory));
+ this.scManager = scmFactory.create(conf, scRegistry);
+ this.serviceFactory = serviceFactory;
+ this.scIdKey = Metadata.Key.of(
+ SC_ID_KEY,
+ LongBinaryMarshaller.of());
+ }
+
+ @Override
+ public StorageContainerRegistryImpl getRegistry() {
+ return this.scRegistry;
+ }
+
+ @Override
+ public StorageContainerRoutingService getRoutingService() {
+ return this.scManager;
+ }
+
+ //
+ // Lifecycle management
+ //
+
+ @Override
+ protected void doStart() {
+ this.scManager.start();
+ }
+
+ @Override
+ protected void doStop() {
+ this.scManager.stop();
+ this.scRegistry.close();
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ this.scManager.close();
+ this.serviceFactory.close();
+ }
+
+ StorageContainer getStorageContainer(long scId) {
+ return scRegistry.getStorageContainer(scId);
+ }
+
+ //
+ // Utils for proxies
+ //
+
+ @Override
+ public Channel findChannel(ServerCall<?, ?> serverCall, Metadata headers) {
+ Long scId = headers.get(scIdKey);
+ if (null == scId) {
+ // use the invalid storage container id, so it will fail the request.
+ scId = INVALID_STORAGE_CONTAINER_ID;
+ }
+ return getStorageContainer(scId).getChannel();
+ }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
index 1e4f681..2fe81e7 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
@@ -34,7 +34,7 @@ public class GrpcMetaRangeService extends MetaRangeServiceImplBase {
private final RangeStoreService rangeStore;
- public GrpcMetaRangeService(RangeStore service) {
+ public GrpcMetaRangeService(RangeStoreService service) {
this.rangeStore = service;
log.info("Created MetaRange service");
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcServices.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcServices.java
new file mode 100644
index 0000000..8cf8d26
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcServices.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.grpc;
+
+import com.google.common.collect.Lists;
+import io.grpc.ServerServiceDefinition;
+import java.util.Collection;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+
+/**
+ * Define all the grpc services associated from a range store.
+ */
+public final class GrpcServices {
+
+ private GrpcServices() {}
+
+ /**
+ * Create the grpc services of a range store.
+ *
+ * @param store range store.
+ * @return the list of grpc services should be associated with the range store.
+ */
+ public static Collection<ServerServiceDefinition> create(RangeStoreService store) {
+ return Lists.newArrayList(
+ new GrpcRootRangeService(store).bindService(),
+ new GrpcMetaRangeService(store).bindService(),
+ new GrpcTableService(store).bindService());
+ }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/Channel404.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/Channel404.java
new file mode 100644
index 0000000..4905aa8
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/Channel404.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import javax.annotation.Nullable;
+
+/**
+ * A channel that always throws {@link Status#NOT_FOUND}.
+ */
+final class Channel404 extends Channel {
+
+ static Channel404 of() {
+ return INSTANCE;
+ }
+
+ private static final Channel404 INSTANCE = new Channel404();
+
+ private static final ClientCall<Object, Object> CALL404 = new ClientCall<Object, Object>() {
+ @Override
+ public void start(Listener<Object> responseListener, Metadata headers) {
+ responseListener.onClose(Status.NOT_FOUND, new Metadata());
+ }
+
+ @Override
+ public void request(int numMessages) {
+ // no-op
+ }
+
+ @Override
+ public void cancel(@Nullable String message, @Nullable Throwable cause) {
+ // no-op
+ }
+
+ @Override
+ public void halfClose() {
+ // no-op
+ }
+
+ @Override
+ public void sendMessage(Object message) {
+ // no-op
+ }
+ };
+
+ private Channel404() {}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
+ MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
+ return (ClientCall<RequestT, ResponseT>) CALL404;
+ }
+
+ @Override
+ public String authority() {
+ return null;
+ }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerFactory.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerFactory.java
index 9fbf595..2073c0b 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerFactory.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerFactory.java
@@ -14,45 +14,24 @@
package org.apache.bookkeeper.stream.storage.impl.sc;
-import java.net.URI;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerFactory;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
/**
* The default storage container factory for creating {@link StorageContainer}s.
*/
public class DefaultStorageContainerFactory implements StorageContainerFactory {
- private final StorageConfiguration storageConf;
- private final StorageContainerPlacementPolicy rangePlacementPolicy;
- private final OrderedScheduler scheduler;
- private final MVCCStoreFactory storeFactory;
- private final URI defaultBackendUri;
+ private final StorageContainerServiceFactory serviceFactory;
- public DefaultStorageContainerFactory(StorageConfiguration storageConf,
- StorageContainerPlacementPolicy rangePlacementPolicy,
- OrderedScheduler scheduler,
- MVCCStoreFactory storeFactory,
- URI defaultBackendUri) {
- this.storageConf = storageConf;
- this.rangePlacementPolicy = rangePlacementPolicy;
- this.scheduler = scheduler;
- this.storeFactory = storeFactory;
- this.defaultBackendUri = defaultBackendUri;
+ public DefaultStorageContainerFactory(StorageContainerServiceFactory serviceFactory) {
+ this.serviceFactory = serviceFactory;
}
@Override
public StorageContainer createStorageContainer(long scId) {
return new StorageContainerImpl(
- storageConf,
- scId,
- rangePlacementPolicy,
- scheduler,
- storeFactory,
- defaultBackendUri);
+ serviceFactory, scId);
}
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainer404.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainer404.java
new file mode 100644
index 0000000..1d87c41
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainer404.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import io.grpc.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+
+/**
+ * A storage container that always responds {@link io.grpc.Status#NOT_FOUND}.
+ */
+final class StorageContainer404 implements StorageContainer {
+
+ static StorageContainer404 of() {
+ return INSTANCE;
+ }
+
+ private static final StorageContainer404 INSTANCE = new StorageContainer404();
+
+ private StorageContainer404() {}
+
+ @Override
+ public long getId() {
+ return -404;
+ }
+
+ @Override
+ public Channel getChannel() {
+ return Channel404.of();
+ }
+
+ @Override
+ public CompletableFuture<StorageContainer> start() {
+ return FutureUtils.value(this);
+ }
+
+ @Override
+ public CompletableFuture<Void> stop() {
+ return FutureUtils.Void();
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
index 52fa060..4bdc801 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
@@ -18,123 +18,47 @@
package org.apache.bookkeeper.stream.storage.impl.sc;
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_RANGE_ID;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_STREAM_ID;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_RANGE_ID;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STREAM_ID;
-
-import com.google.common.collect.Lists;
-import java.net.URI;
-import java.util.List;
+import io.grpc.Channel;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import java.io.IOException;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import lombok.AccessLevel;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
-import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
-import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
-import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.protocol.RangeId;
-import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
-import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
-import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
-import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreCache;
-import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreFactory;
-import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreImpl;
-import org.apache.bookkeeper.stream.storage.impl.metadata.MetaRangeStoreFactory;
-import org.apache.bookkeeper.stream.storage.impl.metadata.MetaRangeStoreImpl;
-import org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreFactory;
-import org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl;
-import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
/**
* The default implementation of {@link StorageContainer}.
*/
@Slf4j
-public class StorageContainerImpl
- implements StorageContainer {
+class StorageContainerImpl implements StorageContainer {
+ private final String containerName;
+ private final StorageContainerService service;
private final long scId;
+ private final Server grpcServer;
+ private volatile Channel channel = Channel404.of();
- // store container that used for fail requests.
- private final StorageContainer failRequestStorageContainer;
- // store factory
- private final MVCCStoreFactory storeFactory;
- // storage container
- @Getter(value = AccessLevel.PACKAGE)
- private MetaRangeStore mgStore;
- @Getter(value = AccessLevel.PACKAGE)
- private final MetaRangeStoreFactory mrStoreFactory;
- // root range
- @Getter(value = AccessLevel.PACKAGE)
- private RootRangeStore rootRange;
- @Getter(value = AccessLevel.PACKAGE)
- private final RootRangeStoreFactory rrStoreFactory;
- // table range stores
- @Getter(value = AccessLevel.PACKAGE)
- private final TableStoreCache tableStoreCache;
- @Getter(value = AccessLevel.PACKAGE)
- private final TableStoreFactory tableStoreFactory;
-
- public StorageContainerImpl(StorageConfiguration storageConf,
- long scId,
- StorageContainerPlacementPolicy rangePlacementPolicy,
- OrderedScheduler scheduler,
- MVCCStoreFactory storeFactory,
- URI defaultBackendUri) {
- this(
- scId,
- scheduler,
- storeFactory,
- store -> new RootRangeStoreImpl(
- defaultBackendUri, store, rangePlacementPolicy, scheduler.chooseThread(scId)),
- store -> new MetaRangeStoreImpl(store, rangePlacementPolicy, scheduler.chooseThread(scId)),
- store -> new TableStoreImpl(store));
+ StorageContainerImpl(StorageContainerServiceFactory serviceFactory, long scId) {
+ this.scId = scId;
+ this.service = serviceFactory.createStorageContainerService(scId);
+ this.containerName = "container-" + scId;
+ this.grpcServer = buildGrpcServer(containerName, service.getRegisteredServices());
}
- public StorageContainerImpl(long scId,
- OrderedScheduler scheduler,
- MVCCStoreFactory storeFactory,
- RootRangeStoreFactory rrStoreFactory,
- MetaRangeStoreFactory mrStoreFactory,
- TableStoreFactory tableStoreFactory) {
- this.scId = scId;
- this.failRequestStorageContainer = FailRequestStorageContainer.of(scheduler);
- this.rootRange = failRequestStorageContainer;
- this.mgStore = failRequestStorageContainer;
- this.storeFactory = storeFactory;
- this.rrStoreFactory = rrStoreFactory;
- this.mrStoreFactory = mrStoreFactory;
- this.tableStoreFactory = tableStoreFactory;
- this.tableStoreCache = new TableStoreCache(storeFactory, tableStoreFactory);
+ private static Server buildGrpcServer(String serverName,
+ Collection<ServerServiceDefinition> services) {
+ InProcessServerBuilder builder = InProcessServerBuilder.forName(serverName);
+ services.forEach(service -> builder.addService(service));
+ return builder
+ .directExecutor()
+ .build();
}
//
@@ -146,42 +70,24 @@ public class StorageContainerImpl
return scId;
}
- private CompletableFuture<Void> startRootRangeStore() {
- if (ROOT_STORAGE_CONTAINER_ID != scId) {
- return FutureUtils.Void();
- }
- return storeFactory.openStore(
- ROOT_STORAGE_CONTAINER_ID,
- ROOT_STREAM_ID,
- ROOT_RANGE_ID
- ).thenApply(store -> {
- rootRange = rrStoreFactory.createStore(store);
- return null;
- });
- }
-
- private CompletableFuture<Void> startMetaRangeStore(long scId) {
- return storeFactory.openStore(
- scId,
- CONTAINER_META_STREAM_ID,
- CONTAINER_META_RANGE_ID
- ).thenApply(store -> {
- mgStore = mrStoreFactory.createStore(store);
- return null;
- });
- }
-
@Override
public CompletableFuture<StorageContainer> start() {
log.info("Starting storage container ({}) ...", getId());
- List<CompletableFuture<Void>> futures = Lists.newArrayList(
- startRootRangeStore(),
- startMetaRangeStore(scId));
-
- return FutureUtils.collect(futures).thenApply(ignored -> {
- log.info("Successfully started storage container ({}).", getId());
- return StorageContainerImpl.this;
+ return service.start().thenCompose(ignored -> {
+ try {
+ grpcServer.start();
+ log.info("Successfully started storage container ({}).", getId());
+
+ channel = InProcessChannelBuilder.forName(containerName)
+ .usePlaintext()
+ .directExecutor()
+ .build();
+ return FutureUtils.value(StorageContainerImpl.this);
+ } catch (IOException e) {
+ log.error("Failed to start the grpc server for storage container ({})", getId(), e);
+ return FutureUtils.exception(e);
+ }
});
}
@@ -189,160 +95,40 @@ public class StorageContainerImpl
public CompletableFuture<Void> stop() {
log.info("Stopping storage container ({}) ...", getId());
- return storeFactory.closeStores(scId).thenApply(ignored -> {
- log.info("Successfully stopped storage container ({}).", getId());
- return null;
- });
- }
+ Channel existingChannel = channel;
- @Override
- public void close() {
- stop().join();
- }
-
- //
- // Storage Container API
- //
-
- //
- // Namespace API
- //
-
- @Override
- public CompletableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest request) {
- return rootRange.createNamespace(request);
- }
+ // set channel to 404
+ channel = Channel404.of();
- @Override
- public CompletableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest request) {
- return rootRange.deleteNamespace(request);
- }
-
- @Override
- public CompletableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest request) {
- return rootRange.getNamespace(request);
- }
-
- //
- // Stream API.
- //
-
- @Override
- public CompletableFuture<CreateStreamResponse> createStream(CreateStreamRequest request) {
- return rootRange.createStream(request);
- }
-
- @Override
- public CompletableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest request) {
- return rootRange.deleteStream(request);
- }
+ if (null != existingChannel) {
+ if (existingChannel instanceof ManagedChannel) {
+ ((ManagedChannel) existingChannel).shutdown();
+ }
+ }
- @Override
- public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
- return rootRange.getStream(request);
- }
+ if (null != grpcServer) {
+ grpcServer.shutdown();
+ }
- //
- // Stream Meta Range API.
- //
+ return service.stop().thenApply(ignored -> {
- @Override
- public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
- return mgStore.getActiveRanges(request);
+ log.info("Successfully stopped storage container ({}).", getId());
+ return null;
+ });
}
//
- // Table API
+ // Grpc
//
-
- @Override
- public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
- checkArgument(KV_RANGE_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- RangeRequest rr = request.getKvRangeReq();
- RoutingHeader header = rr.getHeader();
-
- RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
- TableStore store = tableStoreCache.getTableStore(rid);
- if (null != store) {
- return store.range(request);
- } else {
- return tableStoreCache.openTableStore(scId, rid)
- .thenCompose(s -> s.range(request));
- }
- }
-
- @Override
- public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
- checkArgument(KV_PUT_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- PutRequest rr = request.getKvPutReq();
- RoutingHeader header = rr.getHeader();
-
- RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
- TableStore store = tableStoreCache.getTableStore(rid);
- if (null != store) {
- return store.put(request);
- } else {
- return tableStoreCache.openTableStore(scId, rid)
- .thenCompose(s -> s.put(request));
- }
- }
-
@Override
- public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
- checkArgument(KV_DELETE_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- DeleteRangeRequest rr = request.getKvDeleteReq();
- RoutingHeader header = rr.getHeader();
-
- RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
- TableStore store = tableStoreCache.getTableStore(rid);
- if (null != store) {
- return store.delete(request);
- } else {
- return tableStoreCache.openTableStore(scId, rid)
- .thenCompose(s -> s.delete(request));
- }
+ public Channel getChannel() {
+ return channel;
}
@Override
- public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
- checkArgument(KV_TXN_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- TxnRequest rr = request.getKvTxnReq();
- RoutingHeader header = rr.getHeader();
-
- RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
- TableStore store = tableStoreCache.getTableStore(rid);
- if (null != store) {
- return store.txn(request);
- } else {
- return tableStoreCache.openTableStore(scId, rid)
- .thenCompose(s -> s.txn(request));
- }
+ public void close() {
+ stop().join();
}
- @Override
- public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
- checkArgument(KV_INCR_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- IncrementRequest ir = request.getKvIncrReq();
- RoutingHeader header = ir.getHeader();
-
- RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
- TableStore store = tableStoreCache.getTableStore(rid);
- if (null != store) {
- return store.incr(request);
- } else {
- return tableStoreCache.openTableStore(scId, rid)
- .thenCompose(s -> s.incr(request));
- }
- }
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 4499801..48330b7 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -22,7 +22,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.exceptions.ObjectClosedException;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerFactory;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
@@ -39,13 +38,10 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
private final StorageContainerFactory scFactory;
private final ConcurrentMap<Long, StorageContainer> containers;
private final ReentrantReadWriteLock closeLock;
- private final StorageContainer failRequestStorageContainer;
private boolean closed = false;
- public StorageContainerRegistryImpl(StorageContainerFactory factory,
- OrderedScheduler scheduler) {
+ public StorageContainerRegistryImpl(StorageContainerFactory factory) {
this.scFactory = factory;
- this.failRequestStorageContainer = FailRequestStorageContainer.of(scheduler);
this.containers = Maps.newConcurrentMap();
this.closeLock = new ReentrantReadWriteLock();
}
@@ -62,7 +58,7 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
@Override
public StorageContainer getStorageContainer(long storageContainerId) {
- return containers.getOrDefault(storageContainerId, failRequestStorageContainer);
+ return containers.getOrDefault(storageContainerId, StorageContainer404.of());
}
@Override
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
similarity index 85%
rename from stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
index 8217e89..e15b808 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.bookkeeper.stream.storage.impl.sc;
+package org.apache.bookkeeper.stream.storage.impl.service;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
import io.grpc.Status;
@@ -40,49 +39,39 @@ import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
/**
* It is a single-ton implementation that fails all requests.
*/
-public final class FailRequestStorageContainer implements StorageContainer {
+final class FailRequestRangeStoreService implements RangeStoreService {
- public static StorageContainer of(OrderedScheduler scheduler) {
- return new FailRequestStorageContainer(scheduler);
+ static RangeStoreService of(OrderedScheduler scheduler) {
+ return new FailRequestRangeStoreService(scheduler);
}
private final OrderedScheduler scheduler;
- private FailRequestStorageContainer(OrderedScheduler scheduler) {
+ private FailRequestRangeStoreService(OrderedScheduler scheduler) {
this.scheduler = scheduler;
}
- @Override
- public long getId() {
- return INVALID_STORAGE_CONTAINER_ID;
+ private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
+ CompletableFuture<T> future = FutureUtils.createFuture();
+ scheduler.executeOrdered(scId, () -> {
+ future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
+ });
+ return future;
}
@Override
- public CompletableFuture<StorageContainer> start() {
- return CompletableFuture.completedFuture(this);
+ public CompletableFuture<Void> start() {
+ return FutureUtils.Void();
}
@Override
public CompletableFuture<Void> stop() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public void close() {
- // no-op
- }
-
- private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
- CompletableFuture<T> future = FutureUtils.createFuture();
- scheduler.executeOrdered(scId, () -> {
- future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
- });
- return future;
+ return FutureUtils.Void();
}
//
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceFactoryImpl.java
new file mode 100644
index 0000000..77a7950
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceFactoryImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.service;
+
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
+import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
+
+/**
+ * The default storage container factory for creating {@link StorageContainer}s.
+ */
+public class RangeStoreContainerServiceFactoryImpl implements StorageContainerServiceFactory {
+
+ private final RangeStoreServiceFactory serviceFactory;
+
+ public RangeStoreContainerServiceFactoryImpl(RangeStoreServiceFactory serviceFactory) {
+ this.serviceFactory = serviceFactory;
+ }
+
+ @Override
+ public StorageContainerService createStorageContainerService(long scId) {
+ return new RangeStoreContainerServiceImpl(serviceFactory.createService(scId));
+ }
+
+ @Override
+ public void close() {
+ serviceFactory.close();
+ }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceImpl.java
new file mode 100644
index 0000000..8a8f6d3
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreContainerServiceImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.service;
+
+import io.grpc.ServerServiceDefinition;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService;
+import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcServices;
+
+/**
+ * Implement the range service that runs on one storage container.
+ */
+class RangeStoreContainerServiceImpl implements StorageContainerService {
+
+ private final RangeStoreService store;
+ private final Collection<ServerServiceDefinition> grpcServices;
+
+ RangeStoreContainerServiceImpl(RangeStoreService service) {
+ this.store = service;
+ this.grpcServices = GrpcServices.create(service);
+ }
+
+ @Override
+ public Collection<ServerServiceDefinition> getRegisteredServices() {
+ return grpcServices;
+ }
+
+ @Override
+ public CompletableFuture<Void> start() {
+ return store.start();
+ }
+
+ @Override
+ public CompletableFuture<Void> stop() {
+ return store.stop();
+ }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
new file mode 100644
index 0000000..02dd8a3
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.service;
+
+import java.net.URI;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SharedResourceManager;
+import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
+import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
+
+/**
+ * Default implementation of {@link RangeStoreServiceFactory}.
+ */
+public class RangeStoreServiceFactoryImpl implements RangeStoreServiceFactory {
+
+ private final StorageConfiguration storageConf;
+ private final StorageContainerPlacementPolicy rangePlacementPolicy;
+ private final Resource<OrderedScheduler> schedulerResource;
+ private final OrderedScheduler scheduler;
+ private final MVCCStoreFactory storeFactory;
+ private final URI defaultBackendUri;
+
+ public RangeStoreServiceFactoryImpl(StorageConfiguration storageConf,
+ StorageContainerPlacementPolicy rangePlacementPolicy,
+ Resource<OrderedScheduler> schedulerResource,
+ MVCCStoreFactory storeFactory,
+ URI defaultBackendUri) {
+ this.storageConf = storageConf;
+ this.rangePlacementPolicy = rangePlacementPolicy;
+ this.schedulerResource = schedulerResource;
+ this.scheduler = SharedResourceManager.shared().get(schedulerResource);
+ this.storeFactory = storeFactory;
+ this.defaultBackendUri = defaultBackendUri;
+ }
+
+ @Override
+ public RangeStoreService createService(long scId) {
+ return new RangeStoreServiceImpl(
+ storageConf,
+ scId,
+ rangePlacementPolicy,
+ scheduler,
+ storeFactory,
+ defaultBackendUri);
+ }
+
+ @Override
+ public void close() {
+ SharedResourceManager.shared().release(schedulerResource, scheduler);
+ }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
similarity index 86%
copy from stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
copy to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index 52fa060..0304fc8 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.bookkeeper.stream.storage.impl.sc;
+package org.apache.bookkeeper.stream.storage.impl.service;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
@@ -63,8 +63,8 @@ import org.apache.bookkeeper.stream.protocol.RangeId;
import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreCache;
import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreFactory;
@@ -76,16 +76,13 @@ import org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
/**
- * The default implementation of {@link StorageContainer}.
+ * The service implementation running in a storage container.
*/
@Slf4j
-public class StorageContainerImpl
- implements StorageContainer {
+class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
private final long scId;
- // store container that used for fail requests.
- private final StorageContainer failRequestStorageContainer;
// store factory
private final MVCCStoreFactory storeFactory;
// storage container
@@ -104,12 +101,12 @@ public class StorageContainerImpl
@Getter(value = AccessLevel.PACKAGE)
private final TableStoreFactory tableStoreFactory;
- public StorageContainerImpl(StorageConfiguration storageConf,
- long scId,
- StorageContainerPlacementPolicy rangePlacementPolicy,
- OrderedScheduler scheduler,
- MVCCStoreFactory storeFactory,
- URI defaultBackendUri) {
+ RangeStoreServiceImpl(StorageConfiguration storageConf,
+ long scId,
+ StorageContainerPlacementPolicy rangePlacementPolicy,
+ OrderedScheduler scheduler,
+ MVCCStoreFactory storeFactory,
+ URI defaultBackendUri) {
this(
scId,
scheduler,
@@ -120,14 +117,15 @@ public class StorageContainerImpl
store -> new TableStoreImpl(store));
}
- public StorageContainerImpl(long scId,
- OrderedScheduler scheduler,
- MVCCStoreFactory storeFactory,
- RootRangeStoreFactory rrStoreFactory,
- MetaRangeStoreFactory mrStoreFactory,
- TableStoreFactory tableStoreFactory) {
+ RangeStoreServiceImpl(long scId,
+ OrderedScheduler scheduler,
+ MVCCStoreFactory storeFactory,
+ RootRangeStoreFactory rrStoreFactory,
+ MetaRangeStoreFactory mrStoreFactory,
+ TableStoreFactory tableStoreFactory) {
this.scId = scId;
- this.failRequestStorageContainer = FailRequestStorageContainer.of(scheduler);
+ RangeStoreService failRequestStorageContainer =
+ FailRequestRangeStoreService.of(scheduler);
this.rootRange = failRequestStorageContainer;
this.mgStore = failRequestStorageContainer;
this.storeFactory = storeFactory;
@@ -141,7 +139,6 @@ public class StorageContainerImpl
// Services
//
- @Override
public long getId() {
return scId;
}
@@ -171,28 +168,16 @@ public class StorageContainerImpl
});
}
- @Override
- public CompletableFuture<StorageContainer> start() {
- log.info("Starting storage container ({}) ...", getId());
-
+ public CompletableFuture<Void> start() {
List<CompletableFuture<Void>> futures = Lists.newArrayList(
startRootRangeStore(),
startMetaRangeStore(scId));
- return FutureUtils.collect(futures).thenApply(ignored -> {
- log.info("Successfully started storage container ({}).", getId());
- return StorageContainerImpl.this;
- });
+ return FutureUtils.collect(futures).thenApply(ignored -> null);
}
- @Override
public CompletableFuture<Void> stop() {
- log.info("Stopping storage container ({}) ...", getId());
-
- return storeFactory.closeStores(scId).thenApply(ignored -> {
- log.info("Successfully stopped storage container ({}).", getId());
- return null;
- });
+ return storeFactory.closeStores(scId);
}
@Override
@@ -345,4 +330,5 @@ public class StorageContainerImpl
.thenCompose(s -> s.incr(request));
}
}
+
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/package-info.java
new file mode 100644
index 0000000..68ad094
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes that actually implement the storage services.
+ */
+package org.apache.bookkeeper.stream.storage.impl.service;
\ No newline at end of file
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestRangeStoreBuilder.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
similarity index 84%
rename from stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestRangeStoreBuilder.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
index 85d3ef0..3d3a0ad 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestRangeStoreBuilder.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
@@ -18,18 +18,18 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.net.URI;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.impl.StorageContainerStoreImpl;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
import org.junit.Before;
import org.junit.Test;
/**
- * Unit test for {@link RangeStoreBuilder}.
+ * Unit test for {@link StorageContainerStoreBuilder}.
*/
-public class TestRangeStoreBuilder {
+public class TestStorageContainerStoreBuilder {
private MVCCStoreFactory storeFactory;
private final URI uri = URI.create("distributedlog://127.0.0.1/stream/storage");
@@ -41,7 +41,7 @@ public class TestRangeStoreBuilder {
@Test(expected = NullPointerException.class)
public void testBuildNullConfiguration() {
- RangeStoreBuilder.newBuilder()
+ StorageContainerStoreBuilder.newBuilder()
.withStorageConfiguration(null)
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
@@ -52,7 +52,7 @@ public class TestRangeStoreBuilder {
@Test(expected = NullPointerException.class)
public void testBuildNullResources() {
- RangeStoreBuilder.newBuilder()
+ StorageContainerStoreBuilder.newBuilder()
.withStorageConfiguration(mock(StorageConfiguration.class))
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(null)
@@ -63,7 +63,7 @@ public class TestRangeStoreBuilder {
@Test(expected = NullPointerException.class)
public void testBuildNullRGManagerFactory() {
- RangeStoreBuilder.newBuilder()
+ StorageContainerStoreBuilder.newBuilder()
.withStorageConfiguration(mock(StorageConfiguration.class))
.withStorageContainerManagerFactory(null)
.withStorageResources(StorageResources.create())
@@ -74,7 +74,7 @@ public class TestRangeStoreBuilder {
@Test(expected = NullPointerException.class)
public void testBuildNullStoreFactory() {
- RangeStoreBuilder.newBuilder()
+ StorageContainerStoreBuilder.newBuilder()
.withStorageConfiguration(mock(StorageConfiguration.class))
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
@@ -85,7 +85,7 @@ public class TestRangeStoreBuilder {
@Test(expected = NullPointerException.class)
public void testBuildNullDefaultBackendUri() {
- RangeStoreBuilder.newBuilder()
+ StorageContainerStoreBuilder.newBuilder()
.withStorageConfiguration(mock(StorageConfiguration.class))
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
@@ -96,14 +96,14 @@ public class TestRangeStoreBuilder {
@Test
public void testBuild() {
- RangeStore rangeStore = RangeStoreBuilder.newBuilder()
+ StorageContainerStore storageContainerStore = StorageContainerStoreBuilder.newBuilder()
.withStorageConfiguration(mock(StorageConfiguration.class))
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(storeFactory)
.withDefaultBackendUri(uri)
.build();
- assertTrue(rangeStore instanceof RangeStoreImpl);
+ assertTrue(storageContainerStore instanceof StorageContainerStoreImpl);
}
}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
similarity index 69%
rename from stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
index ac4bb78..a269466 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
@@ -14,6 +14,7 @@
package org.apache.bookkeeper.stream.storage.impl;
+import static org.apache.bookkeeper.common.util.ListenableFutures.fromListenableFuture;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateNamespaceRequest;
@@ -33,14 +34,24 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.protobuf.ByteString;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
-import java.net.URI;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.grpc.proxy.ProxyHandlerRegistry;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.StreamName;
import org.apache.bookkeeper.stream.proto.StreamProperties;
@@ -65,14 +76,21 @@ import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
+import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceFutureStub;
+import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
+import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.RangeStoreBuilder;
-import org.apache.bookkeeper.stream.storage.StorageResources;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceFutureStub;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcServices;
import org.apache.bookkeeper.stream.storage.impl.sc.LocalStorageContainerManager;
+import org.apache.bookkeeper.stream.storage.impl.service.RangeStoreContainerServiceFactoryImpl;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
import org.apache.commons.configuration.CompositeConfiguration;
import org.junit.After;
@@ -80,10 +98,10 @@ import org.junit.Before;
import org.junit.Test;
/**
- * Unit test of {@link RangeStoreImpl}.
+ * Unit test of {@link StorageContainerStoreImpl}.
*/
@Slf4j
-public class TestRangeStoreImpl {
+public class TestStorageContainerStoreImpl {
private static final StreamProperties streamProps = StreamProperties.newBuilder()
.setStorageContainerId(System.currentTimeMillis())
@@ -115,8 +133,13 @@ public class TestRangeStoreImpl {
NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
.build();
- private StorageResources storageResources;
- private RangeStoreImpl rangeStore;
+ private RangeStoreService mockRangeStoreService;
+ private StorageContainerStoreImpl rangeStore;
+ private Server server;
+ private Channel channel;
+ private TableServiceFutureStub tableService;
+ private RootRangeServiceFutureStub rootRangeService;
+ private MetaRangeServiceFutureStub metaRangeService;
//
// Utils for table api
@@ -196,8 +219,6 @@ public class TestRangeStoreImpl {
@SuppressWarnings("unchecked")
@Before
public void setUp() throws Exception {
- storageResources = StorageResources.create();
-
Endpoint endpoint = createEndpoint("127.0.0.1", 0);
// create the client manager
@@ -208,19 +229,61 @@ public class TestRangeStoreImpl {
when(storeFactory.closeStores(anyLong()))
.thenReturn(FutureUtils.Void());
- rangeStore = (RangeStoreImpl) RangeStoreBuilder.newBuilder()
- .withStorageConfiguration(storageConf)
- .withStorageResources(storageResources)
- .withStorageContainerManagerFactory((storeConf, rgRegistry)
- -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2))
- .withRangeStoreFactory(storeFactory)
- .withDefaultBackendUri(URI.create("distributedlog://127.0.0.1/stream/storage"))
- .build();
+ RangeStoreServiceFactory rangeStoreServiceFactory = mock(RangeStoreServiceFactory.class);
+ mockRangeStoreService = mock(RangeStoreService.class);
+ when(mockRangeStoreService.start()).thenReturn(FutureUtils.Void());
+ when(mockRangeStoreService.stop()).thenReturn(FutureUtils.Void());
+ when(rangeStoreServiceFactory.createService(anyLong()))
+ .thenReturn(mockRangeStoreService);
+
+ rangeStore = new StorageContainerStoreImpl(
+ storageConf,
+ (storeConf, rgRegistry)
+ -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2),
+ new RangeStoreContainerServiceFactoryImpl(rangeStoreServiceFactory),
+ NullStatsLogger.INSTANCE);
+
rangeStore.start();
+
+ final String serverName = "test-server";
+
+ Collection<ServerServiceDefinition> grpcServices = GrpcServices.create(null);
+ ProxyHandlerRegistry.Builder registryBuilder = ProxyHandlerRegistry.newBuilder();
+ grpcServices.forEach(service -> registryBuilder.addService(service));
+ ProxyHandlerRegistry registry = registryBuilder
+ .setChannelFinder(rangeStore)
+ .build();
+ server = InProcessServerBuilder.forName(serverName)
+ .fallbackHandlerRegistry(registry)
+ .directExecutor()
+ .build()
+ .start();
+
+ channel = InProcessChannelBuilder.forName(serverName)
+ .usePlaintext()
+ .build();
+
+ // intercept the channel with storage container information.
+ channel = ClientInterceptors.intercept(
+ channel,
+ new StorageContainerClientInterceptor(0L));
+
+
+ tableService = TableServiceGrpc.newFutureStub(channel);
+ metaRangeService = MetaRangeServiceGrpc.newFutureStub(channel);
+ rootRangeService = RootRangeServiceGrpc.newFutureStub(channel);
}
@After
public void tearDown() {
+ if (null != channel) {
+ if (channel instanceof ManagedChannel) {
+ ((ManagedChannel) channel).shutdown();
+ }
+ }
+ if (null != server) {
+ server.shutdown();
+ }
if (null != rangeStore) {
rangeStore.close();
}
@@ -247,8 +310,8 @@ public class TestRangeStoreImpl {
rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
String colName = "test-create-namespace-no-root-storage-container-store";
- verifyNotFoundException(
- rangeStore.createNamespace(createCreateNamespaceRequest(colName, namespaceConf)),
+ verifyNotFoundException(fromListenableFuture(
+ rootRangeService.createNamespace(createCreateNamespaceRequest(colName, namespaceConf))),
Status.NOT_FOUND);
}
@@ -257,8 +320,8 @@ public class TestRangeStoreImpl {
rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
String colName = "test-delete-namespace-no-root-storage-container-store";
- verifyNotFoundException(
- rangeStore.deleteNamespace(createDeleteNamespaceRequest(colName)),
+ verifyNotFoundException(fromListenableFuture(
+ rootRangeService.deleteNamespace(createDeleteNamespaceRequest(colName))),
Status.NOT_FOUND);
}
@@ -267,8 +330,8 @@ public class TestRangeStoreImpl {
rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
String colName = "test-get-namespace-no-root-storage-container-store";
- verifyNotFoundException(
- rangeStore.getNamespace(createGetNamespaceRequest(colName)),
+ verifyNotFoundException(fromListenableFuture(
+ rootRangeService.getNamespace(createGetNamespaceRequest(colName))),
Status.NOT_FOUND);
}
@@ -276,41 +339,35 @@ public class TestRangeStoreImpl {
public void testCreateNamespaceMockRootStorageContainerStore() throws Exception {
String colName = "test-create-namespace-mock-root-storage-container-store";
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
CreateNamespaceResponse createResp = CreateNamespaceResponse.newBuilder()
.setCode(StatusCode.NAMESPACE_EXISTS)
.build();
CreateNamespaceRequest request = createCreateNamespaceRequest(colName, namespaceConf);
- when(scStore.createNamespace(request))
+ when(mockRangeStoreService.createNamespace(request))
.thenReturn(CompletableFuture.completedFuture(createResp));
CompletableFuture<CreateNamespaceResponse> createRespFuture =
- rangeStore.createNamespace(request);
- verify(scStore, times(1)).createNamespace(request);
+ fromListenableFuture(rootRangeService.createNamespace(request));
assertTrue(createResp == createRespFuture.get());
+ verify(mockRangeStoreService, times(1)).createNamespace(request);
}
@Test
public void testDeleteNamespaceMockRootStorageContainerStore() throws Exception {
String colName = "test-delete-namespace-no-root-storage-container-store";
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
DeleteNamespaceResponse deleteResp = DeleteNamespaceResponse.newBuilder()
.setCode(StatusCode.NAMESPACE_NOT_FOUND)
.build();
DeleteNamespaceRequest request = createDeleteNamespaceRequest(colName);
- when(scStore.deleteNamespace(request))
+ when(mockRangeStoreService.deleteNamespace(request))
.thenReturn(CompletableFuture.completedFuture(deleteResp));
CompletableFuture<DeleteNamespaceResponse> deleteRespFuture =
- rangeStore.deleteNamespace(request);
- verify(scStore, times(1)).deleteNamespace(request);
+ fromListenableFuture(rootRangeService.deleteNamespace(request));
+ verify(mockRangeStoreService, times(1)).deleteNamespace(request);
assertTrue(deleteResp == deleteRespFuture.get());
}
@@ -318,20 +375,17 @@ public class TestRangeStoreImpl {
public void testGetNamespaceMockRootStorageContainerStore() throws Exception {
String colName = "test-get-namespace-no-root-storage-container-store";
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
GetNamespaceResponse getResp = GetNamespaceResponse.newBuilder()
.setCode(StatusCode.NAMESPACE_NOT_FOUND)
.build();
GetNamespaceRequest request = createGetNamespaceRequest(colName);
- when(scStore.getNamespace(request)).thenReturn(
+ when(mockRangeStoreService.getNamespace(request)).thenReturn(
CompletableFuture.completedFuture(getResp));
CompletableFuture<GetNamespaceResponse> getRespFuture =
- rangeStore.getNamespace(request);
- verify(scStore, times(1)).getNamespace(request);
+ fromListenableFuture(rootRangeService.getNamespace(request));
+ verify(mockRangeStoreService, times(1)).getNamespace(request);
assertTrue(getResp == getRespFuture.get());
}
@@ -345,8 +399,8 @@ public class TestRangeStoreImpl {
String colName = "test-create-namespace-no-root-storage-container-store";
String streamName = colName;
- verifyNotFoundException(
- rangeStore.createStream(createCreateStreamRequest(colName, streamName, DEFAULT_STREAM_CONF)),
+ verifyNotFoundException(fromListenableFuture(
+ rootRangeService.createStream(createCreateStreamRequest(colName, streamName, DEFAULT_STREAM_CONF))),
Status.NOT_FOUND);
}
@@ -356,8 +410,8 @@ public class TestRangeStoreImpl {
String colName = "test-delete-namespace-no-root-storage-container-store";
String streamName = colName;
- verifyNotFoundException(
- rangeStore.deleteStream(createDeleteStreamRequest(colName, streamName)),
+ verifyNotFoundException(fromListenableFuture(
+ rootRangeService.deleteStream(createDeleteStreamRequest(colName, streamName))),
Status.NOT_FOUND);
}
@@ -367,8 +421,8 @@ public class TestRangeStoreImpl {
String colName = "test-get-namespace-no-root-storage-container-store";
String streamName = colName;
- verifyNotFoundException(
- rangeStore.getStream(createGetStreamRequest(colName, streamName)),
+ verifyNotFoundException(fromListenableFuture(
+ rootRangeService.getStream(createGetStreamRequest(colName, streamName))),
Status.NOT_FOUND);
}
@@ -377,19 +431,16 @@ public class TestRangeStoreImpl {
String colName = "test-create-namespace-mock-root-storage-container-store";
String streamName = colName;
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
CreateStreamResponse createResp = CreateStreamResponse.newBuilder()
.setCode(StatusCode.STREAM_EXISTS)
.build();
CreateStreamRequest createReq = createCreateStreamRequest(colName, streamName, DEFAULT_STREAM_CONF);
- when(scStore.createStream(createReq)).thenReturn(
+ when(mockRangeStoreService.createStream(createReq)).thenReturn(
CompletableFuture.completedFuture(createResp));
CompletableFuture<CreateStreamResponse> createRespFuture =
- rangeStore.createStream(createReq);
- verify(scStore, times(1)).createStream(createReq);
+ fromListenableFuture(rootRangeService.createStream(createReq));
+ verify(mockRangeStoreService, times(1)).createStream(createReq);
assertTrue(createResp == createRespFuture.get());
}
@@ -398,19 +449,16 @@ public class TestRangeStoreImpl {
String colName = "test-delete-namespace-no-root-storage-container-store";
String streamName = colName;
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
DeleteStreamResponse deleteResp = DeleteStreamResponse.newBuilder()
.setCode(StatusCode.STREAM_NOT_FOUND)
.build();
DeleteStreamRequest deleteReq = createDeleteStreamRequest(colName, streamName);
- when(scStore.deleteStream(deleteReq)).thenReturn(
+ when(mockRangeStoreService.deleteStream(deleteReq)).thenReturn(
CompletableFuture.completedFuture(deleteResp));
CompletableFuture<DeleteStreamResponse> deleteRespFuture =
- rangeStore.deleteStream(deleteReq);
- verify(scStore, times(1)).deleteStream(deleteReq);
+ fromListenableFuture(rootRangeService.deleteStream(deleteReq));
+ verify(mockRangeStoreService, times(1)).deleteStream(deleteReq);
assertTrue(deleteResp == deleteRespFuture.get());
}
@@ -419,26 +467,25 @@ public class TestRangeStoreImpl {
String colName = "test-get-namespace-no-root-storage-container-store";
String streamName = colName;
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
GetStreamResponse getResp = GetStreamResponse.newBuilder()
.setCode(StatusCode.STREAM_NOT_FOUND)
.build();
GetStreamRequest getReq = createGetStreamRequest(colName, streamName);
- when(scStore.getStream(getReq)).thenReturn(
+ when(mockRangeStoreService.getStream(getReq)).thenReturn(
CompletableFuture.completedFuture(getResp));
CompletableFuture<GetStreamResponse> getRespFuture =
- rangeStore.getStream(getReq);
- verify(scStore, times(1)).getStream(getReq);
+ fromListenableFuture(rootRangeService.getStream(getReq));
+ verify(mockRangeStoreService, times(1)).getStream(getReq);
assertTrue(getResp == getRespFuture.get());
}
@Test
public void testGetActiveRangesNoManager() throws Exception {
- verifyNotFoundException(
- rangeStore.getActiveRanges(createGetActiveRangesRequest(12L, 34L)),
+ rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+
+ verifyNotFoundException(fromListenableFuture(
+ metaRangeService.getActiveRanges(createGetActiveRangesRequest(12L, 34L))),
Status.NOT_FOUND);
}
@@ -446,25 +493,17 @@ public class TestRangeStoreImpl {
public void testGetActiveRangesMockManager() throws Exception {
long scId = System.currentTimeMillis();
- StreamProperties props = StreamProperties.newBuilder(streamProps)
- .setStorageContainerId(scId)
- .build();
-
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(scId, scStore);
-
StorageContainerResponse resp = StorageContainerResponse.newBuilder()
.setCode(StatusCode.STREAM_NOT_FOUND)
.build();
StorageContainerRequest request = createGetActiveRangesRequest(scId, 34L);
- when(scStore.getActiveRanges(request))
+ when(mockRangeStoreService.getActiveRanges(request))
.thenReturn(CompletableFuture.completedFuture(resp));
- CompletableFuture<StorageContainerResponse> future =
- rangeStore.getActiveRanges(request);
- verify(scStore, times(1)).getActiveRanges(request);
+ CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+ metaRangeService.getActiveRanges(request));
+ verify(mockRangeStoreService, times(1)).getActiveRanges(request);
assertTrue(resp == future.get());
}
@@ -477,8 +516,8 @@ public class TestRangeStoreImpl {
public void testPutNoStorageContainer() throws Exception {
rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
- verifyNotFoundException(
- rangeStore.put(createPutRequest(ROOT_STORAGE_CONTAINER_ID)),
+ verifyNotFoundException(fromListenableFuture(
+ tableService.put(createPutRequest(ROOT_STORAGE_CONTAINER_ID))),
Status.NOT_FOUND);
}
@@ -486,8 +525,8 @@ public class TestRangeStoreImpl {
public void testDeleteNoStorageContainer() throws Exception {
rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
- verifyNotFoundException(
- rangeStore.delete(createDeleteRequest(ROOT_STORAGE_CONTAINER_ID)),
+ verifyNotFoundException(fromListenableFuture(
+ tableService.delete(createDeleteRequest(ROOT_STORAGE_CONTAINER_ID))),
Status.NOT_FOUND);
}
@@ -495,59 +534,50 @@ public class TestRangeStoreImpl {
public void testRangeNoStorageContainer() throws Exception {
rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
- verifyNotFoundException(
- rangeStore.range(createRangeRequest(ROOT_STORAGE_CONTAINER_ID)),
+ verifyNotFoundException(fromListenableFuture(
+ tableService.range(createRangeRequest(ROOT_STORAGE_CONTAINER_ID))),
Status.NOT_FOUND);
}
@Test
public void testRangeMockStorageContainer() throws Exception {
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
StorageContainerResponse response = createRangeResponse(StatusCode.SUCCESS);
StorageContainerRequest request = createRangeRequest(ROOT_STORAGE_CONTAINER_ID);
- when(scStore.range(request))
+ when(mockRangeStoreService.range(request))
.thenReturn(CompletableFuture.completedFuture(response));
- CompletableFuture<StorageContainerResponse> future =
- rangeStore.range(request);
- verify(scStore, times(1)).range(eq(request));
+ CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+ tableService.range(request));
+ verify(mockRangeStoreService, times(1)).range(eq(request));
assertTrue(response == future.get());
}
@Test
public void testDeleteMockStorageContainer() throws Exception {
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
StorageContainerResponse response = createDeleteResponse(StatusCode.SUCCESS);
StorageContainerRequest request = createDeleteRequest(ROOT_STORAGE_CONTAINER_ID);
- when(scStore.delete(request))
+ when(mockRangeStoreService.delete(request))
.thenReturn(CompletableFuture.completedFuture(response));
- CompletableFuture<StorageContainerResponse> future =
- rangeStore.delete(request);
- verify(scStore, times(1)).delete(eq(request));
+ CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+ tableService.delete(request));
+ verify(mockRangeStoreService, times(1)).delete(eq(request));
assertTrue(response == future.get());
}
@Test
public void testPutMockStorageContainer() throws Exception {
- StorageContainer scStore = mock(StorageContainer.class);
- when(scStore.stop()).thenReturn(FutureUtils.value(null));
- rangeStore.getRegistry().setStorageContainer(ROOT_STORAGE_CONTAINER_ID, scStore);
StorageContainerResponse response = createPutResponse(StatusCode.SUCCESS);
StorageContainerRequest request = createPutRequest(ROOT_STORAGE_CONTAINER_ID);
- when(scStore.put(request))
+ when(mockRangeStoreService.put(request))
.thenReturn(CompletableFuture.completedFuture(response));
- CompletableFuture<StorageContainerResponse> future =
- rangeStore.put(request);
- verify(scStore, times(1)).put(eq(request));
+ CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+ tableService.put(request));
+ verify(mockRangeStoreService, times(1)).put(eq(request));
assertTrue(response == future.get());
}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
index 787453f..178ac19 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
@@ -32,7 +32,7 @@ import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.junit.Test;
/**
@@ -48,7 +48,7 @@ public class TestGrpcMetaRangeService {
@Test
public void testGetActiveRangesSuccess() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -79,7 +79,7 @@ public class TestGrpcMetaRangeService {
@Test
public void testGetActiveRangesFailure() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -109,7 +109,7 @@ public class TestGrpcMetaRangeService {
@Test
public void testGetActiveRangesException() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
StorageContainerRequest request = StorageContainerRequest
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java
index 7970793..8a6c7a9 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java
@@ -70,8 +70,8 @@ import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.apache.bookkeeper.stream.storage.exceptions.StorageException;
-import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
import org.junit.Test;
/**
@@ -108,7 +108,7 @@ public class TestGrpcRootRangeService {
@Test
public void testCreateNamespaceSuccess() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
CreateNamespaceResponse createResp = CreateNamespaceResponse.newBuilder()
.setCode(StatusCode.SUCCESS)
@@ -152,7 +152,7 @@ public class TestGrpcRootRangeService {
@Test
public void testCreateNamespaceFailure() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
CreateNamespaceResponse createResp = CreateNamespaceResponse.newBuilder()
.setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -195,7 +195,7 @@ public class TestGrpcRootRangeService {
@Test
public void testDeleteNamespaceSuccess() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
DeleteNamespaceResponse deleteResp = DeleteNamespaceResponse.newBuilder()
.setCode(StatusCode.SUCCESS)
@@ -237,7 +237,7 @@ public class TestGrpcRootRangeService {
@Test
public void testDeleteNamespaceFailure() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
DeleteNamespaceResponse deleteResp = DeleteNamespaceResponse.newBuilder()
.setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -279,7 +279,7 @@ public class TestGrpcRootRangeService {
@Test
public void testGetNamespaceSuccess() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
GetNamespaceResponse getResp = GetNamespaceResponse.newBuilder()
.setCode(StatusCode.SUCCESS)
@@ -322,7 +322,7 @@ public class TestGrpcRootRangeService {
@Test
public void testGetNamespaceFailure() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
GetNamespaceResponse getResp = GetNamespaceResponse.newBuilder()
.setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -368,7 +368,7 @@ public class TestGrpcRootRangeService {
@Test
public void testCreateStreamSuccess() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
CreateStreamResponse createResp = CreateStreamResponse.newBuilder()
.setCode(StatusCode.SUCCESS)
@@ -413,7 +413,7 @@ public class TestGrpcRootRangeService {
@Test
public void testCreateStreamFailure() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
CreateStreamResponse createResp = CreateStreamResponse.newBuilder()
.setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -457,7 +457,7 @@ public class TestGrpcRootRangeService {
@Test
public void testDeleteStreamSuccess() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
DeleteStreamResponse deleteResp = DeleteStreamResponse.newBuilder()
.setCode(StatusCode.SUCCESS)
@@ -500,7 +500,7 @@ public class TestGrpcRootRangeService {
@Test
public void testDeleteStreamFailure() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
DeleteStreamResponse deleteResp = DeleteStreamResponse.newBuilder()
.setCode(StatusCode.INTERNAL_SERVER_ERROR)
@@ -543,7 +543,7 @@ public class TestGrpcRootRangeService {
@Test
public void testGetStreamSuccess() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
GetStreamResponse getResp = GetStreamResponse.newBuilder()
.setCode(StatusCode.SUCCESS)
@@ -588,7 +588,7 @@ public class TestGrpcRootRangeService {
@Test
public void testGetStreamFailure() throws Exception {
- RangeStoreImpl rangeService = mock(RangeStoreImpl.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcRootRangeService grpcService = new GrpcRootRangeService(rangeService);
GetStreamResponse getResp = GetStreamResponse.newBuilder()
.setCode(StatusCode.INTERNAL_SERVER_ERROR)
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
index ed97dc1..e943fb4 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
@@ -38,7 +38,7 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.junit.Test;
/**
@@ -63,7 +63,7 @@ public class TestGrpcTableService {
@Test
public void testPutSuccess() throws Exception {
- RangeStore rangeService = mock(RangeStore.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -95,7 +95,7 @@ public class TestGrpcTableService {
@Test
public void testPutFailure() throws Exception {
- RangeStore rangeService = mock(RangeStore.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -126,7 +126,7 @@ public class TestGrpcTableService {
@Test
public void testPutException() throws Exception {
- RangeStore rangeService = mock(RangeStore.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -153,7 +153,7 @@ public class TestGrpcTableService {
@Test
public void testRangeSuccess() throws Exception {
- RangeStore rangeService = mock(RangeStore.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -184,7 +184,7 @@ public class TestGrpcTableService {
@Test
public void testRangeActiveRangesFailure() throws Exception {
- RangeStore rangeService = mock(RangeStore.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -214,7 +214,7 @@ public class TestGrpcTableService {
@Test
public void testRangeActiveRangesException() throws Exception {
- RangeStore rangeService = mock(RangeStore.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -240,7 +240,7 @@ public class TestGrpcTableService {
@Test
public void testDeleteSuccess() throws Exception {
- RangeStore rangeService = mock(RangeStore.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -271,7 +271,7 @@ public class TestGrpcTableService {
@Test
public void testDeleteFailure() throws Exception {
- RangeStore rangeService = mock(RangeStore.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
StorageContainerRequest request = StorageContainerRequest
@@ -301,7 +301,7 @@ public class TestGrpcTableService {
@Test
public void testDeleteException() throws Exception {
- RangeStore rangeService = mock(RangeStore.class);
+ RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
StorageContainerRequest request = StorageContainerRequest
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestDefaultStorageContainerFactory.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestDefaultStorageContainerFactory.java
index ed87fa6..a56623b 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestDefaultStorageContainerFactory.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestDefaultStorageContainerFactory.java
@@ -16,23 +16,14 @@ package org.apache.bookkeeper.stream.storage.impl.sc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import com.google.common.util.concurrent.ListenableScheduledFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import java.net.URI;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
-import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
import org.junit.Test;
-import org.mockito.Mockito;
/**
* Unit test for {@link DefaultStorageContainerFactory}.
@@ -40,23 +31,15 @@ import org.mockito.Mockito;
public class TestDefaultStorageContainerFactory {
@Test
- public void testCreate() throws Exception {
- OrderedScheduler scheduler = mock(OrderedScheduler.class);
- OrderedScheduler snapshotScheduler = mock(OrderedScheduler.class);
- MVCCStoreFactory storeFactory = mock(MVCCStoreFactory.class);
- ListeningScheduledExecutorService snapshotExecutor = mock(ListeningScheduledExecutorService.class);
- when(snapshotScheduler.chooseThread(anyLong())).thenReturn(snapshotExecutor);
- Mockito.doReturn(mock(ListenableScheduledFuture.class))
- .when(snapshotExecutor).scheduleWithFixedDelay(
- any(Runnable.class), anyInt(), anyInt(), any(TimeUnit.class));
+ public void testCreate() {
+ StorageContainerServiceFactory mockServiceFactory =
+ mock(StorageContainerServiceFactory.class);
+ StorageContainerService mockService = mock(StorageContainerService.class);
+ when(mockServiceFactory.createStorageContainerService(anyLong()))
+ .thenReturn(mockService);
- DefaultStorageContainerFactory factory = new DefaultStorageContainerFactory(
- new StorageConfiguration(new CompositeConfiguration()),
- (streamId, rangeId) -> streamId,
- scheduler,
- storeFactory,
- URI.create("distributedlog://127.0.0.1/stream/storage"));
+ DefaultStorageContainerFactory factory = new DefaultStorageContainerFactory(mockServiceFactory);
StorageContainer sc = factory.createStorageContainer(1234L);
assertTrue(sc instanceof StorageContainerImpl);
assertEquals(1234L, sc.getId());
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
index 7031629..837dbe2 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
@@ -11,21 +11,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.bookkeeper.stream.storage.impl.sc;
import static org.junit.Assert.assertEquals;
@@ -81,7 +66,7 @@ public class TestStorageContainerRegistryImpl {
@Test
public void testOperationsAfterClosed() throws Exception {
StorageContainerFactory scFactory = createStorageContainerFactory();
- StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory, scheduler);
+ StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory);
registry.close();
long scId = 1234L;
@@ -106,7 +91,7 @@ public class TestStorageContainerRegistryImpl {
@Test
public void testStopNotFoundStorageContainer() throws Exception {
StorageContainerFactory scFactory = createStorageContainerFactory();
- StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory, scheduler);
+ StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory);
FutureUtils.result(registry.stopStorageContainer(1234L));
assertEquals(0, registry.getNumStorageContainers());
}
@@ -114,7 +99,7 @@ public class TestStorageContainerRegistryImpl {
@Test
public void testStartStorageContainerTwice() throws Exception {
StorageContainerFactory scFactory = createStorageContainerFactory();
- StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory, scheduler);
+ StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(scFactory);
FutureUtils.result(registry.startStorageContainer(1234L));
assertEquals(1, registry.getNumStorageContainers());
// second time
@@ -140,7 +125,7 @@ public class TestStorageContainerRegistryImpl {
long scId = 1L;
- StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(factory, scheduler);
+ StorageContainerRegistryImpl registry = new StorageContainerRegistryImpl(factory);
FutureUtils.result(registry.startStorageContainer(scId));
assertEquals(1, registry.getNumStorageContainers());
assertEquals(sc1, registry.getStorageContainer(scId));
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
index 193503b..3a5a4fd 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
@@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.testing.MoreAsserts;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
@@ -76,7 +75,6 @@ public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
private StorageContainerRegistry scRegistry;
private ZkClusterMetadataStore clusterMetadataStore;
private ZkStorageContainerManager scManager;
- private OrderedScheduler scheduler;
@Before
public void setup() {
@@ -89,13 +87,8 @@ public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
curatorClient, zkServers, "/" + runtime.getMethodName()));
clusterMetadataStore.initializeCluster(NUM_STORAGE_CONTAINERS);
- scheduler = OrderedScheduler.newSchedulerBuilder()
- .name("test-scheduler")
- .numThreads(1)
- .build();
-
mockScFactory = mock(StorageContainerFactory.class);
- scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory, scheduler));
+ scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory));
scManager = new ZkStorageContainerManager(
myEndpoint,
@@ -112,10 +105,6 @@ public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
scManager.close();
}
- if (null != scheduler) {
- scheduler.shutdown();
- }
-
if (null != curatorClient) {
curatorClient.close();
}
@@ -278,7 +267,7 @@ public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
);
}
};
- scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory, scheduler));
+ scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory));
scManager = new ZkStorageContainerManager(
myEndpoint,
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
similarity index 98%
rename from stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImplTest.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
index e2a582b..a4d8710 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.bookkeeper.stream.storage.impl.sc;
+package org.apache.bookkeeper.stream.storage.impl.service;
import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
@@ -73,9 +73,9 @@ import org.junit.Before;
import org.junit.Test;
/**
- * Unit test of {@link StorageContainerImpl}.
+ * Unit test of {@link RangeStoreServiceImpl}.
*/
-public class StorageContainerImplTest {
+public class RangeStoreServiceImplTest {
private static final long SCID = 3456L;
private static final long STREAM_ID = 1234L;
@@ -86,7 +86,7 @@ public class StorageContainerImplTest {
private RootRangeStoreFactory rrStoreFactory;
private MetaRangeStoreFactory mrStoreFactory;
private TableStoreFactory tableStoreFactory;
- private StorageContainerImpl container;
+ private RangeStoreServiceImpl container;
private OrderedScheduler scheduler;
private RootRangeStore rrStore;
private MVCCAsyncStore<byte[], byte[]> rrMvccStore;
@@ -111,7 +111,7 @@ public class StorageContainerImplTest {
this.mrMvccStore = mock(MVCCAsyncStore.class);
this.trMvccStore = mock(MVCCAsyncStore.class);
- this.container = new StorageContainerImpl(
+ this.container = new RangeStoreServiceImpl(
SCID,
scheduler,
mvccStoreFactory,
@@ -179,7 +179,7 @@ public class StorageContainerImplTest {
public void testStartRootContainer() throws Exception {
mockStorageContainer(ROOT_STORAGE_CONTAINER_ID);
- StorageContainerImpl container = new StorageContainerImpl(
+ RangeStoreServiceImpl container = new RangeStoreServiceImpl(
ROOT_STORAGE_CONTAINER_ID,
scheduler,
mvccStoreFactory,
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.