You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/03 08:32:04 UTC
[bookkeeper] branch master updated: [TABLE SERVICE] [STORAGE]
improve the logic on creating the meta range for a table
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 897c643 [TABLE SERVICE] [STORAGE] improve the logic on creating the meta range for a table
897c643 is described below
commit 897c643e4248fc9fd716c3124b007aa8c540b6ff
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Oct 3 01:31:58 2018 -0700
[TABLE SERVICE] [STORAGE] improve the logic on creating the meta range for a table
Descriptions of the changes in this PR:
*Motivation*
Currently the creation of meta range for table is deferred until we first try to use it.
The propagation of stream properties is carried by the client.
*Changes*
Improve the creation logic to fetch stream properties from the root range.
Author: Sijie Guo <si...@apache.org>
Reviewers: Jia Zhai <None>
This closes #1730 from sijie/cache_stream_props
---
.../stream/storage/StorageContainerStoreBuilder.java | 2 +-
.../stream/storage/impl/metadata/MetaRangeStoreImpl.java | 13 +++++++++----
.../stream/storage/impl/metadata/RootRangeStoreImpl.java | 6 +-----
.../storage/impl/service/RangeStoreServiceFactoryImpl.java | 11 +++++------
.../stream/storage/impl/service/RangeStoreServiceImpl.java | 12 +++++-------
.../storage/impl/metadata/MetaRangeStoreImplTest.java | 14 ++++++++++----
.../storage/impl/metadata/TestRootRangeStoreImpl.java | 4 ----
7 files changed, 31 insertions(+), 31 deletions(-)
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index cac363f..9dda002 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -159,7 +159,7 @@ public final class StorageContainerStoreBuilder {
placementPolicyFactory.newPlacementPolicy(),
storeResources.scheduler(),
mvccStoreFactory,
- defaultBackendUri);
+ clientManagerSupplier.get());
RangeStoreContainerServiceFactoryImpl containerServiceFactory =
new RangeStoreContainerServiceFactoryImpl(serviceFactory);
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
index 530d4cd..b64d320 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.bookkeeper.stream.proto.RangeMetadata;
@@ -48,14 +49,17 @@ public class MetaRangeStoreImpl
private final ScheduledExecutorService executor;
private final StorageContainerPlacementPolicy rangePlacementPolicy;
private final Map<Long, MetaRangeImpl> streams;
+ private final StorageServerClientManager clientManager;
public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
StorageContainerPlacementPolicy rangePlacementPolicy,
- ScheduledExecutorService executor) {
+ ScheduledExecutorService executor,
+ StorageServerClientManager clientManager) {
this.store = store;
this.executor = executor;
this.rangePlacementPolicy = rangePlacementPolicy;
this.streams = Maps.newHashMap();
+ this.clientManager = clientManager;
}
//
@@ -100,9 +104,10 @@ public class MetaRangeStoreImpl
return metaRangeImpl.load(streamId)
.thenCompose(mr -> {
if (null == mr) {
- StreamProperties streamProps = request.hasStreamProps()
- ? request.getStreamProps() : null;
- return createStreamIfMissing(streamId, metaRangeImpl, streamProps);
+ // meta range doesn't exist, talk to root range to get the stream props
+ return clientManager.getStreamProperties(streamId)
+ .thenCompose(streamProperties ->
+ createStreamIfMissing(streamId, metaRangeImpl, streamProperties));
} else {
synchronized (streams) {
streams.put(streamId, (MetaRangeImpl) mr);
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
index 2f2c404..c066bd1 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
@@ -24,7 +24,6 @@ import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.validateName
import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.validateStreamName;
import com.google.protobuf.InvalidProtocolBufferException;
-import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
@@ -145,16 +144,13 @@ public class RootRangeStoreImpl
return streamIdBytes;
}
- private final URI defaultServiceUri;
private final MVCCAsyncStore<byte[], byte[]> store;
private final StorageContainerPlacementPolicy placementPolicy;
private final ScheduledExecutorService executor;
- public RootRangeStoreImpl(URI defaultServiceUri,
- MVCCAsyncStore<byte[], byte[]> store,
+ public RootRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
StorageContainerPlacementPolicy placementPolicy,
ScheduledExecutorService executor) {
- this.defaultServiceUri = defaultServiceUri;
this.store = store;
this.placementPolicy = placementPolicy;
this.executor = executor;
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
index 02dd8a3..937a9fa 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
@@ -18,7 +18,7 @@
package org.apache.bookkeeper.stream.storage.impl.service;
-import java.net.URI;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.SharedResourceManager;
import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
@@ -38,30 +38,29 @@ public class RangeStoreServiceFactoryImpl implements RangeStoreServiceFactory {
private final Resource<OrderedScheduler> schedulerResource;
private final OrderedScheduler scheduler;
private final MVCCStoreFactory storeFactory;
- private final URI defaultBackendUri;
+ private final StorageServerClientManager clientManager;
public RangeStoreServiceFactoryImpl(StorageConfiguration storageConf,
StorageContainerPlacementPolicy rangePlacementPolicy,
Resource<OrderedScheduler> schedulerResource,
MVCCStoreFactory storeFactory,
- URI defaultBackendUri) {
+ StorageServerClientManager clientManager) {
this.storageConf = storageConf;
this.rangePlacementPolicy = rangePlacementPolicy;
this.schedulerResource = schedulerResource;
this.scheduler = SharedResourceManager.shared().get(schedulerResource);
this.storeFactory = storeFactory;
- this.defaultBackendUri = defaultBackendUri;
+ this.clientManager = clientManager;
}
@Override
public RangeStoreService createService(long scId) {
return new RangeStoreServiceImpl(
- storageConf,
scId,
rangePlacementPolicy,
scheduler,
storeFactory,
- defaultBackendUri);
+ clientManager);
}
@Override
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index 7adcefd..e3b2d39 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -25,12 +25,12 @@ import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORA
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STREAM_ID;
import com.google.common.collect.Lists;
-import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
@@ -64,7 +64,6 @@ import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreCache;
import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreFactory;
import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreImpl;
@@ -100,19 +99,18 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
@Getter(value = AccessLevel.PACKAGE)
private final TableStoreFactory tableStoreFactory;
- RangeStoreServiceImpl(StorageConfiguration storageConf,
- long scId,
+ RangeStoreServiceImpl(long scId,
StorageContainerPlacementPolicy rangePlacementPolicy,
OrderedScheduler scheduler,
MVCCStoreFactory storeFactory,
- URI defaultBackendUri) {
+ StorageServerClientManager clientManager) {
this(
scId,
scheduler,
storeFactory,
store -> new RootRangeStoreImpl(
- defaultBackendUri, store, rangePlacementPolicy, scheduler.chooseThread(scId)),
- store -> new MetaRangeStoreImpl(store, rangePlacementPolicy, scheduler.chooseThread(scId)),
+ store, rangePlacementPolicy, scheduler.chooseThread(scId)),
+ store -> new MetaRangeStoreImpl(store, rangePlacementPolicy, scheduler.chooseThread(scId), clientManager),
store -> new TableStoreImpl(store));
}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
index d0156c5..5e5f4b3 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
@@ -20,12 +20,16 @@ package org.apache.bookkeeper.stream.storage.impl.metadata;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import java.util.stream.LongStream;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stream.proto.RangeMetadata;
import org.apache.bookkeeper.stream.proto.RangeState;
@@ -48,6 +52,7 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
private StreamProperties streamProps;
private MetaRangeStoreImpl mrStoreImpl;
+ private StorageServerClientManager clientManager;
@Override
protected void doSetup() throws Exception {
@@ -57,10 +62,12 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
.setStreamName(name.getMethodName() + "_stream")
.setStreamId(System.currentTimeMillis())
.build();
+ this.clientManager = mock(StorageServerClientManager.class);
this.mrStoreImpl = new MetaRangeStoreImpl(
this.store,
StorageContainerPlacementPolicyImpl.of(1024),
- this.scheduler.chooseThread());
+ this.scheduler.chooseThread(),
+ clientManager);
}
@Override
@@ -68,11 +75,10 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
}
GetActiveRangesRequest createRequest(StreamProperties streamProperties) {
+ when(clientManager.getStreamProperties(eq(this.streamProps.getStreamId())))
+ .thenReturn(FutureUtils.value(streamProperties));
GetActiveRangesRequest.Builder reqBuilder = GetActiveRangesRequest.newBuilder()
.setStreamId(this.streamProps.getStreamId());
- if (null != streamProperties) {
- reqBuilder = reqBuilder.setStreamProps(streamProperties);
- }
return reqBuilder.build();
}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
index da3b6d1..9cdbf64 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
@@ -33,7 +33,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.net.URI;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -57,8 +56,6 @@ import org.junit.Test;
@Slf4j
public class TestRootRangeStoreImpl extends MVCCAsyncStoreTestBase {
- private static final String DEFAULT_SERVICE_URI = "distributedlog://127.0.0.1/stream/storage";
-
private final NamespaceConfiguration namespaceConf =
NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
@@ -74,7 +71,6 @@ public class TestRootRangeStoreImpl extends MVCCAsyncStoreTestBase {
@Override
protected void doSetup() throws Exception {
rootRangeStore = new RootRangeStoreImpl(
- URI.create(DEFAULT_SERVICE_URI),
store,
StorageContainerPlacementPolicyImpl.of(1024),
scheduler.chooseThread());