You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/01 18:12:19 UTC
[bookkeeper] branch master updated: [TABLE SERVICE] [STORAGE] add
routing table for proxying table service requests
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3446c2c [TABLE SERVICE] [STORAGE] add routing table for proxying table service requests
3446c2c is described below
commit 3446c2cdb9c49ab5f547657a5d12e92022b56721
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Oct 1 11:12:05 2018 -0700
[TABLE SERVICE] [STORAGE] add routing table for proxying table service requests
Descriptions of the changes in this PR:
*Motivation*
In order to implement non-java clients and avoid the complexity in the client implementation.
We need to proxy a routing table in the server side for proxying table service requests.
*Changes*
Add routing table in the service side to proxy grpc requests to the right storage containers.
Author: Sijie Guo <si...@apache.org>
Reviewers: Jia Zhai <None>, Enrico Olivelli <eo...@gmail.com>
This closes #1721 from sijie/add_routing_table
---
.../impl/internal/ProtocolInternalUtils.java | 2 +-
.../stream/protocol/ProtocolConstants.java | 22 +++
.../bookkeeper/stream/server/StorageServer.java | 18 +-
.../storage/api/sc/StorageContainerRegistry.java | 9 +
stream/storage/impl/pom.xml | 7 +
.../storage/StorageContainerStoreBuilder.java | 17 ++
.../storage/impl/StorageContainerStoreImpl.java | 96 +++++++++--
.../storage/impl/routing/RangeRoutingTable.java | 38 ++++
.../impl/routing/RangeRoutingTableImpl.java | 100 +++++++++++
.../routing/RoutingHeaderProxyInterceptor.java} | 133 +++++++-------
.../StorageContainerProxyChannelManager.java | 39 +++++
.../StorageContainerProxyChannelManagerImpl.java | 60 +++++++
.../stream/storage/impl/routing/package-info.java | 23 +++
.../impl/sc/StorageContainerRegistryImpl.java | 7 +-
.../storage/TestStorageContainerStoreBuilder.java | 19 ++
.../impl/TestStorageContainerStoreImpl.java | 3 +
.../impl/routing/RangeRoutingTableImplTest.java | 191 +++++++++++++++++++++
.../RoutingHeaderProxyInterceptorTest.java} | 19 +-
...torageContainerProxyChannelManagerImplTest.java | 111 ++++++++++++
19 files changed, 832 insertions(+), 82 deletions(-)
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
index a89aaf7..abc9c6d 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
@@ -49,7 +49,7 @@ public final class ProtocolInternalUtils {
private ProtocolInternalUtils() {
}
- static HashStreamRanges createActiveRanges(GetActiveRangesResponse response) {
+ public static HashStreamRanges createActiveRanges(GetActiveRangesResponse response) {
TreeMap<Long, RangeProperties> ranges = Maps.newTreeMap();
long lastEndKey = Long.MIN_VALUE;
for (RelatedRanges rr : response.getRangesList()) {
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 0088c4b..b888b45 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -19,6 +19,8 @@
package org.apache.bookkeeper.stream.protocol;
import io.grpc.Metadata;
+import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy;
import org.apache.bookkeeper.stream.proto.RangeKeyType;
import org.apache.bookkeeper.stream.proto.RetentionPolicy;
@@ -115,4 +117,24 @@ public final class ProtocolConstants {
public static final String ROUTING_KEY = "rk" + Metadata.BINARY_HEADER_SUFFIX;
public static final String STREAM_ID_KEY = "sid-" + Metadata.BINARY_HEADER_SUFFIX;
public static final String RANGE_ID_KEY = "rid-" + Metadata.BINARY_HEADER_SUFFIX;
+
+ // the metadata keys in grpc call metadata
+ public static final Metadata.Key<Long> SCID_METADATA_KEY = Metadata.Key.of(
+ SC_ID_KEY,
+ LongBinaryMarshaller.of()
+ );
+ public static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
+ RANGE_ID_KEY,
+ LongBinaryMarshaller.of()
+ );
+ public static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
+ STREAM_ID_KEY,
+ LongBinaryMarshaller.of()
+ );
+ public static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
+ ROUTING_KEY,
+ IdentityBinaryMarshaller.of()
+ );
+
+
}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 97ff66a..4385d56 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -25,6 +25,9 @@ import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
import org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.common.component.LifecycleComponentStack;
@@ -54,6 +57,7 @@ import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor;
import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
@@ -249,6 +253,10 @@ public class StorageServer {
dlConf,
rootStatsLogger.scope("dlog"));
+ // client settings for the proxy channels
+ StorageClientSettings proxyClientSettings = StorageClientSettings.newBuilder()
+ .serviceUri("bk://localhost:" + grpcPort)
+ .build();
// Create range (stream) store
StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder()
.withStatsLogger(rootStatsLogger.scope("storage"))
@@ -286,7 +294,15 @@ public class StorageServer {
() -> new DLCheckpointStore(dlNamespaceProvider.get()),
storageConf.getRangeStoreDirs(),
storageResources,
- storageConf.getServeReadOnlyTables()));
+ storageConf.getServeReadOnlyTables()))
+ // with client manager for proxying grpc requests
+ .withStorageServerClientManager(() -> new StorageServerClientManagerImpl(
+ proxyClientSettings,
+ storageResources.scheduler(),
+ StorageServerChannel.factory(proxyClientSettings)
+ // intercept the channel to attach routing header
+ .andThen(channel -> channel.intercept(new RoutingHeaderProxyInterceptor()))
+ ));
StorageService storageService = new StorageService(
storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage"));
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
index 57634d3..a051060 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
@@ -36,6 +36,15 @@ public interface StorageContainerRegistry extends AutoCloseable {
StorageContainer getStorageContainer(long storageContainerId);
/**
+ * Get the instance of storage container {@code storageContainerId}.
+ *
+ * @param storageContainerId storage container id
+ * @param defaultContainer the default container to return if the container doesn't exist in the registry
+ * @return the instance of the storage container.
+ */
+ StorageContainer getStorageContainer(long storageContainerId, StorageContainer defaultContainer);
+
+ /**
* Start the storage container in this registry.
*
* @param scId storage container id
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index 52cef7f..a2ce3e0 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -66,6 +66,13 @@
<artifactId>stream-storage-tests-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>stream-storage-java-client-base</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index 9c96139..cac363f 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -17,6 +17,8 @@ package org.apache.bookkeeper.stream.storage;
import static com.google.common.base.Preconditions.checkNotNull;
import java.net.URI;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
@@ -46,6 +48,7 @@ public final class StorageContainerStoreBuilder {
StorageContainerPlacementPolicyImpl.of(1024);
private MVCCStoreFactory mvccStoreFactory = null;
private URI defaultBackendUri = null;
+ private Supplier<StorageServerClientManager> clientManagerSupplier;
private StorageContainerStoreBuilder() {
}
@@ -131,12 +134,25 @@ public final class StorageContainerStoreBuilder {
return this;
}
+ /**
+ * Supplier to provide client manager for proxying requests.
+ *
+ * @param clientManagerSupplier client manager supplier
+ * @return storage container store builder
+ */
+ public StorageContainerStoreBuilder withStorageServerClientManager(
+ Supplier<StorageServerClientManager> clientManagerSupplier) {
+ this.clientManagerSupplier = clientManagerSupplier;
+ return this;
+ }
+
public StorageContainerStore build() {
checkNotNull(scmFactory, "StorageContainerManagerFactory is not provided");
checkNotNull(storeConf, "StorageConfiguration is not provided");
checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
checkNotNull(defaultBackendUri, "Default backend uri is not provided");
checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided");
+ checkNotNull(clientManagerSupplier, "Storage server client manager is not provided");
RangeStoreServiceFactoryImpl serviceFactory = new RangeStoreServiceFactoryImpl(
storeConf,
@@ -152,6 +168,7 @@ public final class StorageContainerStoreBuilder {
storeConf,
scmFactory,
containerServiceFactory,
+ clientManagerSupplier.get(),
statsLogger);
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
index 56fcba2..1271e85 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
@@ -15,15 +15,19 @@
package org.apache.bookkeeper.stream.storage.impl;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SCID_METADATA_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import java.io.IOException;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
-import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
@@ -31,11 +35,15 @@ import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactor
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTable;
+import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTableImpl;
+import org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManager;
+import org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManagerImpl;
import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
/**
- * KeyRange Service.
+ * Storage Container Store manages the containers and routes the requests accordingly.
*/
public class StorageContainerStoreImpl
extends AbstractLifecycleComponent<StorageConfiguration>
@@ -45,11 +53,14 @@ public class StorageContainerStoreImpl
private final StorageContainerRegistryImpl scRegistry;
private final StorageContainerManager scManager;
private final StorageContainerServiceFactory serviceFactory;
- private final Metadata.Key<Long> scIdKey;
+ private final StorageServerClientManager ssClientManager;
+ private final RangeRoutingTable routingTable;
+ private final StorageContainerProxyChannelManager proxyChannelManager;
public StorageContainerStoreImpl(StorageConfiguration conf,
StorageContainerManagerFactory managerFactory,
StorageContainerServiceFactory serviceFactory,
+ StorageServerClientManager ssClientManager,
StatsLogger statsLogger) {
super("range-service", conf, statsLogger);
this.scmFactory = managerFactory;
@@ -57,9 +68,14 @@ public class StorageContainerStoreImpl
new DefaultStorageContainerFactory(serviceFactory));
this.scManager = scmFactory.create(conf, scRegistry);
this.serviceFactory = serviceFactory;
- this.scIdKey = Metadata.Key.of(
- SC_ID_KEY,
- LongBinaryMarshaller.of());
+ this.ssClientManager = ssClientManager;
+ if (ssClientManager == null) {
+ this.proxyChannelManager = null;
+ this.routingTable = null;
+ } else {
+ this.proxyChannelManager = new StorageContainerProxyChannelManagerImpl(ssClientManager);
+ this.routingTable = new RangeRoutingTableImpl(ssClientManager);
+ }
}
@Override
@@ -83,6 +99,10 @@ public class StorageContainerStoreImpl
@Override
protected void doStop() {
+ // it doesn't have to be blocked at waiting closing proxy channels to be completed.
+ if (null != ssClientManager) {
+ this.ssClientManager.closeAsync();
+ }
this.scManager.stop();
this.scRegistry.close();
}
@@ -97,17 +117,69 @@ public class StorageContainerStoreImpl
return scRegistry.getStorageContainer(scId);
}
+ StorageContainer getStorageContainer(long scId, StorageContainer defaultContainer) {
+ return scRegistry.getStorageContainer(scId, defaultContainer);
+ }
+
//
// Utils for proxies
//
@Override
public Channel findChannel(ServerCall<?, ?> serverCall, Metadata headers) {
- Long scId = headers.get(scIdKey);
- if (null == scId) {
- // use the invalid storage container id, so it will fail the request.
- scId = INVALID_STORAGE_CONTAINER_ID;
+ Long scId = headers.get(SCID_METADATA_KEY);
+ if (null != scId) {
+ // if a request is sent directly to a container, then find the container
+ StorageContainer container = getStorageContainer(scId, null);
+ if (null != container) {
+ return container.getChannel();
+ } else {
+ if (scId == 0L && null != proxyChannelManager) {
+ // root container, we attempt to always proxy the requests for root container
+ Channel channel = proxyChannelManager.getStorageContainerChannel(0L);
+ if (null != channel) {
+ return channel;
+ } else {
+ // no channel found to proxy the request, fail the request with 404 container
+ return getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
+ }
+ } else {
+ // no container is found and the scId is not the root container
+ // then fail the request with 404 container
+ return getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
+ }
+ }
+ } else {
+ // if a request is not sent directly to a container, then check if
+ // streamId + routingKey is attached in the header. if so, figure out
+ // which the range id and storage container id to route the request.
+ byte[] routingKey = headers.get(RK_METADATA_KEY);
+ Long streamId = headers.get(SID_METADATA_KEY);
+ if (null != routingKey && null != streamId && null != routingTable && null != proxyChannelManager) {
+ RangeProperties rangeProps = routingTable.getRange(streamId, routingKey);
+ if (null != rangeProps) {
+ long containerId = rangeProps.getStorageContainerId();
+ long rangeId = rangeProps.getRangeId();
+ // if we find the routing information, we can update the headers
+ headers.put(SCID_METADATA_KEY, containerId);
+ headers.put(RID_METADATA_KEY, rangeId);
+ // if we find the container id, we can check whether the container is owned locally.
+ // if so, forward the request to the container.
+ StorageContainer container = getStorageContainer(containerId, null);
+ if (null == container) {
+ // the container doesn't belong to here, then find the channel to forward the request
+ Channel channel = proxyChannelManager.getStorageContainerChannel(containerId);
+ if (null != channel) {
+ return channel;
+ }
+ } else {
+ // we found the container exists in the registry to serve the request
+ return container.getChannel();
+ }
+ }
+ }
+ // there is no storage container id or routing key, then fail the request and ask client to retry.
+ return getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
}
- return getStorageContainer(scId).getChannel();
}
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
new file mode 100644
index 0000000..bb80142
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+
+/**
+ * A routing table for figuring the ranges to forward the requests.
+ */
+public interface RangeRoutingTable {
+
+ /**
+ * Get the range metadata for <tt>streamId</tt> and the routing key <tt>routingKey</tt>.
+ *
+ * @param streamId stream id
+ * @param routingKey routing key
+ * @return the range that serves this routing key
+ */
+ RangeProperties getRange(long streamId, byte[] routingKey);
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
new file mode 100644
index 0000000..b9727c9
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.router.BytesHashRouter;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+
+/**
+ * A default implementation of {@link RangeRoutingTable}.
+ */
+@Slf4j
+public class RangeRoutingTableImpl implements RangeRoutingTable {
+
+ private final StorageServerClientManager manager;
+ private final ConcurrentLongHashMap<RangeRouter<byte[]>> ranges;
+
+ // outstanding requests
+ private final ConcurrentLongHashMap<CompletableFuture<RangeRouter<byte[]>>> outstandingUpdates;
+
+ public RangeRoutingTableImpl(StorageServerClientManager manager) {
+ this.manager = manager;
+ this.ranges = new ConcurrentLongHashMap<>();
+ this.outstandingUpdates = new ConcurrentLongHashMap<>();
+ }
+
+ @VisibleForTesting
+ RangeRouter<byte[]> getRangeRouter(long streamId) {
+ return ranges.get(streamId);
+ }
+
+ @Override
+ public RangeProperties getRange(long streamId, byte[] routingKey) {
+ RangeRouter<byte[]> router = ranges.get(streamId);
+ if (null == router) {
+ // trigger to fetch stream metadata, but return `null` since
+ // the range router is not ready, let the client backoff and retry.
+ fetchStreamRanges(streamId);
+ return null;
+ } else {
+ return router.getRangeProperties(routingKey);
+ }
+ }
+
+ @VisibleForTesting
+ CompletableFuture<RangeRouter<byte[]>> getOutstandingFetchRequest(long streamId) {
+ return outstandingUpdates.get(streamId);
+ }
+
+ private void fetchStreamRanges(long streamId) {
+ if (null != outstandingUpdates.get(streamId)) {
+ // there is already an outstanding fetch request, do nothing
+ return;
+ }
+ final CompletableFuture<RangeRouter<byte[]>> newFetchFuture = new CompletableFuture<>();
+ if (null != outstandingUpdates.put(streamId, newFetchFuture)) {
+ // some one already triggers the fetch
+ return;
+ }
+ FutureUtils.proxyTo(
+ manager.openMetaRangeClient(streamId)
+ .thenCompose(metaRangeClient -> metaRangeClient.getActiveDataRanges())
+ .thenApply(hashStreamRanges -> {
+ RangeRouter<byte[]> router = new RangeRouter<>(BytesHashRouter.of());
+ router.setRanges(hashStreamRanges);
+ return router;
+ })
+ .whenComplete((router, cause) -> {
+ if (null == cause) {
+ ranges.put(streamId, router);
+ }
+ outstandingUpdates.remove(streamId, newFetchFuture);
+ }),
+ newFetchFuture
+ );
+ }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
similarity index 70%
rename from stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
index 2ef970f..377f6df 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
@@ -17,11 +17,11 @@
* under the License.
*/
-package org.apache.bookkeeper.clients.impl.kv.interceptors;
+package org.apache.bookkeeper.stream.storage.impl.routing;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RANGE_ID_KEY;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROUTING_KEY;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.STREAM_ID_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.MessageLite;
@@ -34,6 +34,9 @@ import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -41,8 +44,6 @@ import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
-import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
@@ -50,25 +51,13 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.commons.codec.binary.Hex;
/**
* A client interceptor that intercepting kv rpcs to attach routing information.
*/
@Slf4j
-public class RoutingHeaderClientInterceptor implements ClientInterceptor {
-
- static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
- RANGE_ID_KEY,
- LongBinaryMarshaller.of()
- );
- static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
- STREAM_ID_KEY,
- LongBinaryMarshaller.of()
- );
- static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
- ROUTING_KEY,
- IdentityBinaryMarshaller.of()
- );
+public class RoutingHeaderProxyInterceptor implements ClientInterceptor {
/**
* Table request mutator that mutates a table service rpc request to attach
@@ -175,53 +164,75 @@ public class RoutingHeaderClientInterceptor implements ClientInterceptor {
CallOptions callOptions,
Channel next) {
if (log.isTraceEnabled()) {
- log.trace("Intercepting method {}", method.getFullMethodName());
+ log.trace("Intercepting method {} : req marshaller = {}, resp marshaller = {}",
+ method.getFullMethodName(),
+ method.getRequestMarshaller(),
+ method.getResponseMarshaller());
}
InterceptorDescriptor<?> descriptor = kvRpcMethods.get(method.getFullMethodName());
- if (null != descriptor) {
- return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
-
- private Long rid = null;
- private Long sid = null;
- private byte[] rk = null;
+ return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
- @Override
- public void start(Listener<RespT> responseListener, Metadata headers) {
- // capture routing information from headers
- sid = headers.get(SID_METADATA_KEY);
- rid = headers.get(RID_METADATA_KEY);
- rk = headers.get(RK_METADATA_KEY);
- if (log.isTraceEnabled()) {
- log.trace("Intercepting request with header : sid = {}, rid = {}, rk = {}",
- sid, rid, rk);
- }
+ private Long rid = null;
+ private Long sid = null;
+ private byte[] rk = null;
- delegate().start(responseListener, headers);
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata headers) {
+ // capture routing information from headers
+ sid = headers.get(SID_METADATA_KEY);
+ rid = headers.get(RID_METADATA_KEY);
+ rk = headers.get(RK_METADATA_KEY);
+ if (log.isTraceEnabled()) {
+ log.trace("Intercepting request with header : sid = {}, rid = {}, rk = {}",
+ sid, rid, rk);
}
- @Override
- public void sendMessage(ReqT message) {
- ReqT interceptedMessage;
- if (null == rid || null == sid || null == rk) {
- // we don't have enough information to form the new routing header
- // so do nothing
- interceptedMessage = message;
- } else {
- interceptedMessage = interceptMessage(
- method,
- descriptor,
- message,
- sid,
- rid,
- rk
- );
- }
- delegate().sendMessage(interceptedMessage);
+ delegate().start(responseListener, headers);
+ }
+
+ @Override
+ public void sendMessage(ReqT message) {
+ ReqT interceptedMessage;
+ if (null == rid || null == sid || null == rk || null == descriptor) {
+ // we don't have enough information to form the new routing header
+ // we simply copy the bytes and generate a new message to forward
+ // the request payload
+ interceptedMessage = interceptMessage(method, message);
+ } else {
+ interceptedMessage = interceptMessage(
+ method,
+ descriptor,
+ message,
+ sid,
+ rid,
+ rk
+ );
}
- };
- } else {
- return next.newCall(method, callOptions);
+ delegate().sendMessage(interceptedMessage);
+ }
+ };
+ }
+
+ private <ReqT, RespT> ReqT interceptMessage(MethodDescriptor<ReqT, RespT> method, ReqT message) {
+ InputStream is = method.getRequestMarshaller().stream(message);
+ int bytes;
+ try {
+ bytes = is.available();
+ } catch (IOException e) {
+ log.warn("Encountered exceptions in getting available bytes of message", e);
+ throw new RuntimeException("Encountered exception in intercepting message", e);
+ }
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
+ try {
+ buffer.writeBytes(is, bytes);
+ } catch (IOException e) {
+ log.warn("Encountered exceptions in transferring bytes to the buffer", e);
+ buffer.release();
+ throw new RuntimeException("Encountered exceptions in transferring bytes to the buffer", e);
}
+ return method
+ .getRequestMarshaller()
+ .parse(new ByteBufInputStream(buffer, true));
}
private <ReqT, TableReqT extends MessageLite> ReqT interceptMessage(
@@ -237,7 +248,9 @@ public class RoutingHeaderClientInterceptor implements ClientInterceptor {
} else {
try {
return interceptTableRequest(method, descriptor, message, sid, rid, rk);
- } catch (IOException ioe) {
+ } catch (Throwable t) {
+ log.error("Failed to intercept table request (sid = {}, rid = {}, rk = {}) : ",
+ sid, rid, Hex.encodeHexString(rk), t);
return message;
}
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
new file mode 100644
index 0000000..deb054e
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import io.grpc.Channel;
+
+/**
+ * A manager that manages all the proxy channels to other storage containers.
+ */
+public interface StorageContainerProxyChannelManager {
+
+ /**
+ * Get the channel to storage container <tt>scId</tt>.
+ *
+ * <p>The method can return `null` if the channel is not ready.
+ *
+ * @param scId storage container id
+ * @return channel to the given storage container, or `null` if it can't connect
+ */
+ Channel getStorageContainerChannel(long scId);
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
new file mode 100644
index 0000000..ed69e0f
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import io.grpc.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+
+/**
+ * Default implementation of {@link StorageContainerProxyChannelManager}.
+ */
+public class StorageContainerProxyChannelManagerImpl implements StorageContainerProxyChannelManager {
+
+ // we can ideally just talk to the location service directly.
+ // however currently storage container is separated from actual services to make a clean interface
+ // so for now proxy channel manager will be acting as a client to talk to its local server to
+ // get location related information
+ private final StorageServerClientManager ssClientManager;
+
+ public StorageContainerProxyChannelManagerImpl(StorageServerClientManager clientManager) {
+ this.ssClientManager = clientManager;
+ }
+
+ @Override
+ public Channel getStorageContainerChannel(long scId) {
+ StorageContainerChannel channel = ssClientManager.getStorageContainerChannel(scId);
+ // this will trigger creating the channel for the first time
+ CompletableFuture<StorageServerChannel> serverChannelFuture = channel.getStorageContainerChannelFuture();
+ if (null != serverChannelFuture && serverChannelFuture.isDone()) {
+ StorageServerChannel serverChannel = serverChannelFuture.join();
+ if (serverChannel != null) {
+ return serverChannel.getGrpcChannel();
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
new file mode 100644
index 0000000..5d4437c
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Classes related to grpc request routing.
+ */
+package org.apache.bookkeeper.stream.storage.impl.routing;
\ No newline at end of file
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 48330b7..160d9a8 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -58,7 +58,12 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
@Override
public StorageContainer getStorageContainer(long storageContainerId) {
- return containers.getOrDefault(storageContainerId, StorageContainer404.of());
+ return getStorageContainer(storageContainerId, StorageContainer404.of());
+ }
+
+ @Override
+ public StorageContainer getStorageContainer(long storageContainerId, StorageContainer defaultContainer) {
+ return containers.getOrDefault(storageContainerId, defaultContainer);
}
@Override
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
index 3d3a0ad..59d6a13 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
@@ -18,6 +18,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.net.URI;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -47,6 +48,7 @@ public class TestStorageContainerStoreBuilder {
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(storeFactory)
.withDefaultBackendUri(uri)
+ .withStorageServerClientManager(() -> mock(StorageServerClientManager.class))
.build();
}
@@ -57,6 +59,7 @@ public class TestStorageContainerStoreBuilder {
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(null)
.withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(() -> mock(StorageServerClientManager.class))
.withDefaultBackendUri(uri)
.build();
}
@@ -68,6 +71,7 @@ public class TestStorageContainerStoreBuilder {
.withStorageContainerManagerFactory(null)
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(() -> mock(StorageServerClientManager.class))
.withDefaultBackendUri(uri)
.build();
}
@@ -79,6 +83,7 @@ public class TestStorageContainerStoreBuilder {
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(null)
+ .withStorageServerClientManager(() -> mock(StorageServerClientManager.class))
.withDefaultBackendUri(uri)
.build();
}
@@ -90,10 +95,23 @@ public class TestStorageContainerStoreBuilder {
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(() -> mock(StorageServerClientManager.class))
.withDefaultBackendUri(null)
.build();
}
+ @Test(expected = NullPointerException.class)
+ public void testBuildStorageServerClientManager() {
+ StorageContainerStoreBuilder.newBuilder()
+ .withStorageConfiguration(mock(StorageConfiguration.class))
+ .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
+ .withStorageResources(StorageResources.create())
+ .withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(null)
+ .withDefaultBackendUri(uri)
+ .build();
+ }
+
@Test
public void testBuild() {
StorageContainerStore storageContainerStore = StorageContainerStoreBuilder.newBuilder()
@@ -101,6 +119,7 @@ public class TestStorageContainerStoreBuilder {
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(() -> mock(StorageServerClientManager.class))
.withDefaultBackendUri(uri)
.build();
assertTrue(storageContainerStore instanceof StorageContainerStoreImpl);
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
index b9d5e69..6ae3658 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
@@ -85,6 +85,7 @@ import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRange
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.storage.StorageResources;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -133,6 +134,7 @@ public class TestStorageContainerStoreImpl {
NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
.build();
+ private final StorageResources resources = StorageResources.create();
private RangeStoreService mockRangeStoreService;
private StorageContainerStoreImpl rangeStore;
private Server server;
@@ -230,6 +232,7 @@ public class TestStorageContainerStoreImpl {
(storeConf, rgRegistry)
-> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2),
new RangeStoreContainerServiceFactoryImpl(rangeStoreServiceFactory),
+ null,
NullStatsLogger.INSTANCE);
rangeStore.start();
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
new file mode 100644
index 0000000..8201ba7
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils;
+import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.common.router.BytesHashRouter;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
+import org.apache.bookkeeper.stream.proto.storage.RelationType;
+import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RangeRoutingTable}.
+ */
+public class RangeRoutingTableImplTest extends GrpcClientTestBase {
+
+ private final long scId = 1234L;
+ private final long streamId = 123456L;
+ private GetActiveRangesResponse getActiveRangesResponse;
+ private CompletableFuture<GetActiveRangesResponse> responseSupplier;
+ private StreamProperties props;
+ private List<RangeProperties> rangeProps;
+ private RangeRoutingTableImpl routingTable;
+ private RangeRouter<byte[]> rangeRouter;
+
+ @Override
+ protected void doSetup() throws Exception {
+ this.props = StreamProperties.newBuilder()
+ .setStorageContainerId(scId)
+ .setStreamId(streamId)
+ .setStreamName("metaclient-stream")
+ .setStreamConf(StreamConfiguration.newBuilder().build())
+ .build();
+ this.rangeProps = ProtoUtils.split(
+ streamId,
+ 24,
+ 23456L,
+ StorageContainerPlacementPolicyImpl.of(4)
+ );
+ final GetActiveRangesResponse.Builder getActiveRangesResponseBuilder = GetActiveRangesResponse.newBuilder();
+ for (RangeProperties range : rangeProps) {
+ RelatedRanges.Builder rrBuilder = RelatedRanges.newBuilder()
+ .setProps(range)
+ .setType(RelationType.PARENTS)
+ .addAllRelatedRanges(Collections.emptyList());
+ getActiveRangesResponseBuilder.addRanges(rrBuilder);
+ }
+ this.getActiveRangesResponse = getActiveRangesResponseBuilder
+ .setCode(StatusCode.SUCCESS)
+ .build();
+ RootRangeServiceImplBase rootRangeService = new RootRangeServiceImplBase() {
+ @Override
+ public void getStream(GetStreamRequest request,
+ StreamObserver<GetStreamResponse> responseObserver) {
+ responseObserver.onNext(GetStreamResponse.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setStreamProps(props)
+ .build());
+ responseObserver.onCompleted();
+ }
+ };
+ serviceRegistry.addService(rootRangeService);
+
+ this.responseSupplier = new CompletableFuture<>();
+ // register a good meta range service
+ MetaRangeServiceImplBase metaRangeService = new MetaRangeServiceImplBase() {
+ @Override
+ public void getActiveRanges(GetActiveRangesRequest request,
+ StreamObserver<GetActiveRangesResponse> responseObserver) {
+ try {
+ responseObserver.onNext(responseSupplier.get());
+ responseObserver.onCompleted();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ responseObserver.onError(e);
+ } catch (ExecutionException e) {
+ responseObserver.onError(e);
+ }
+ }
+ };
+ serviceRegistry.addService(metaRangeService);
+
+ this.routingTable = new RangeRoutingTableImpl(serverManager);
+ this.rangeRouter = new RangeRouter<>(BytesHashRouter.of());
+ this.rangeRouter.setRanges(ProtocolInternalUtils.createActiveRanges(getActiveRangesResponse));
+ }
+
+ @Override
+ protected void doTeardown() throws Exception {
+ }
+
+ @Test
+ public void testGetRange() throws Exception {
+ String key = "foo";
+ byte[] keyBytes = key.getBytes(UTF_8);
+ RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes);
+ // the first get will return null since there is nothing in
+ assertNull(rangeProps);
+ // the fetch request is outstanding
+ CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture =
+ routingTable.getOutstandingFetchRequest(streamId);
+ assertNotNull(outstandingFetchFuture);
+ assertFalse(outstandingFetchFuture.isDone());
+
+ // complete the response supplier, so the fetch request can complete to update the cache
+ responseSupplier.complete(getActiveRangesResponse);
+
+ // wait until the stuff is cached.
+ while (null == routingTable.getRangeRouter(streamId)) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+
+ // if the router is created, it should return the cached router
+ rangeProps = routingTable.getRange(streamId, keyBytes);
+ assertNotNull(rangeProps);
+ assertEquals(rangeRouter.getRangeProperties(keyBytes), rangeProps);
+ }
+
+ @Test
+ public void testGetRangeException() throws Exception {
+ String key = "foo";
+ byte[] keyBytes = key.getBytes(UTF_8);
+ RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes);
+ // the first get will return null since there is nothing in
+ assertNull(rangeProps);
+ // the fetch request is outstanding
+ CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture =
+ routingTable.getOutstandingFetchRequest(streamId);
+ assertNotNull(outstandingFetchFuture);
+ assertFalse(outstandingFetchFuture.isDone());
+
+ // complete the response supplier, so the fetch request can complete to update the cache
+ responseSupplier.completeExceptionally(new Exception("fetch failed"));
+
+ // wait until the fetch is done.
+ try {
+ outstandingFetchFuture.get();
+ fail("Fetch request should fail");
+ } catch (Exception e) {
+ // expected
+ }
+
+ // once the fetch is done, nothing should be cached and the outstanding fetch request should be removed
+ assertNull(routingTable.getRangeRouter(streamId));
+ assertNull(routingTable.getOutstandingFetchRequest(streamId));
+ }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
similarity index 94%
rename from stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
index 745118b..43c060b 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
@@ -17,9 +17,12 @@
* under the License.
*/
-package org.apache.bookkeeper.clients.impl.kv.interceptors;
+package org.apache.bookkeeper.stream.storage.impl.routing;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
import static org.junit.Assert.assertEquals;
import com.google.protobuf.ByteString;
@@ -54,10 +57,10 @@ import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.junit.Test;
/**
- * Unit test {@link RoutingHeaderClientInterceptor}.
+ * Unit test {@link RoutingHeaderProxyInterceptor}.
*/
@Slf4j
-public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
+public class RoutingHeaderProxyInterceptorTest extends GrpcClientTestBase {
private final long streamId = 1234L;
private final long rangeId = 2345L;
@@ -136,11 +139,12 @@ public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
};
serviceRegistry.addService(tableService.bindService());
+
this.channel = new StorageServerChannel(
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
Optional.empty()
).intercept(
- new RoutingHeaderClientInterceptor(),
+ new RoutingHeaderProxyInterceptor(),
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
@@ -152,15 +156,15 @@ public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
log.info("Intercept the request with routing information : sid = {}, rid = {}, rk = {}",
streamId, rangeId, new String(routingKey, UTF_8));
headers.put(
- RoutingHeaderClientInterceptor.RID_METADATA_KEY,
+ RID_METADATA_KEY,
rangeId
);
headers.put(
- RoutingHeaderClientInterceptor.SID_METADATA_KEY,
+ SID_METADATA_KEY,
streamId
);
headers.put(
- RoutingHeaderClientInterceptor.RK_METADATA_KEY,
+ RK_METADATA_KEY,
routingKey
);
delegate().start(responseListener, headers);
@@ -173,6 +177,7 @@ public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
@Override
protected void doTeardown() {
+ channel.close();
}
@Test
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
new file mode 100644
index 0000000..bb94843
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.Channel;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointResponse;
+import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointRequest;
+import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
+import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceImplBase;
+import org.junit.Test;
+
+/**
+ * Unit testing {@link StorageContainerProxyChannelManagerImpl}.
+ */
+public class StorageContainerProxyChannelManagerImplTest extends GrpcClientTestBase {
+
+ private final long scId = 1234L;
+ private StorageContainerProxyChannelManagerImpl proxyChannelManager;
+
+ @Override
+ protected void doSetup() throws Exception {
+ this.proxyChannelManager = new StorageContainerProxyChannelManagerImpl(serverManager);
+ }
+
+ @Override
+ protected void doTeardown() throws Exception {
+ }
+
+
+ @Test
+ public void testGetStorageContainerChannel() throws Exception {
+ final CompletableFuture<GetStorageContainerEndpointRequest> receivedRequest = new CompletableFuture<>();
+ final CompletableFuture<GetStorageContainerEndpointResponse> responseSupplier = new CompletableFuture<>();
+ StorageContainerServiceImplBase scService = new StorageContainerServiceImplBase() {
+ @Override
+ public void getStorageContainerEndpoint(
+ GetStorageContainerEndpointRequest request,
+ StreamObserver<GetStorageContainerEndpointResponse> responseObserver) {
+ receivedRequest.complete(request);
+ try {
+ responseObserver.onNext(responseSupplier.get());
+ responseObserver.onCompleted();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ responseObserver.onError(e);
+ } catch (ExecutionException e) {
+ responseObserver.onError(e);
+ }
+ }
+ };
+ serviceRegistry.addService(scService.bindService());
+
+ Channel channel = proxyChannelManager.getStorageContainerChannel(scId);
+ // if the location service doesn't respond, the channel will be null
+ assertNull(channel);
+
+ // complete the location service request
+ responseSupplier.complete(getResponse(receivedRequest.get()));
+ while ((channel = proxyChannelManager.getStorageContainerChannel(scId)) == null) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ assertNotNull(channel);
+ }
+
+ private static GetStorageContainerEndpointResponse getResponse(GetStorageContainerEndpointRequest request) {
+ GetStorageContainerEndpointResponse.Builder respBuilder =
+ GetStorageContainerEndpointResponse.newBuilder();
+ respBuilder.setStatusCode(StatusCode.SUCCESS);
+ for (OneStorageContainerEndpointRequest oneReq : request.getRequestsList()) {
+ OneStorageContainerEndpointResponse oneResp = OneStorageContainerEndpointResponse.newBuilder()
+ .setEndpoint(StorageContainerEndpoint.newBuilder()
+ .setStorageContainerId(oneReq.getStorageContainer())
+ .setRevision(oneReq.getRevision() + 1)
+ .setRwEndpoint(ENDPOINT))
+ .build();
+ respBuilder.addResponses(oneResp);
+ }
+ return respBuilder.build();
+ }
+
+
+
+}