You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/03 08:32:04 UTC

[bookkeeper] branch master updated: [TABLE SERVICE] [STORAGE] improve the logic on creating the meta range for a table

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 897c643  [TABLE SERVICE] [STORAGE] improve the logic on creating the meta range for a table
897c643 is described below

commit 897c643e4248fc9fd716c3124b007aa8c540b6ff
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Oct 3 01:31:58 2018 -0700

    [TABLE SERVICE] [STORAGE] improve the logic on creating the meta range for a table
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    Currently the creation of meta range for table is deferred until we first try to use it.
    The propagation of stream properties is carried by the client.
    
    *Changes*
    
    Improve the creation logic to fetch stream properties from the root range.
    
    
    
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1730 from sijie/cache_stream_props
---
 .../stream/storage/StorageContainerStoreBuilder.java       |  2 +-
 .../stream/storage/impl/metadata/MetaRangeStoreImpl.java   | 13 +++++++++----
 .../stream/storage/impl/metadata/RootRangeStoreImpl.java   |  6 +-----
 .../storage/impl/service/RangeStoreServiceFactoryImpl.java | 11 +++++------
 .../stream/storage/impl/service/RangeStoreServiceImpl.java | 12 +++++-------
 .../storage/impl/metadata/MetaRangeStoreImplTest.java      | 14 ++++++++++----
 .../storage/impl/metadata/TestRootRangeStoreImpl.java      |  4 ----
 7 files changed, 31 insertions(+), 31 deletions(-)

diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index cac363f..9dda002 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -159,7 +159,7 @@ public final class StorageContainerStoreBuilder {
             placementPolicyFactory.newPlacementPolicy(),
             storeResources.scheduler(),
             mvccStoreFactory,
-            defaultBackendUri);
+            clientManagerSupplier.get());
 
         RangeStoreContainerServiceFactoryImpl containerServiceFactory =
             new RangeStoreContainerServiceFactoryImpl(serviceFactory);
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
index 530d4cd..b64d320 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
 import org.apache.bookkeeper.stream.proto.RangeMetadata;
@@ -48,14 +49,17 @@ public class MetaRangeStoreImpl
     private final ScheduledExecutorService executor;
     private final StorageContainerPlacementPolicy rangePlacementPolicy;
     private final Map<Long, MetaRangeImpl> streams;
+    private final StorageServerClientManager clientManager;
 
     public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
                               StorageContainerPlacementPolicy rangePlacementPolicy,
-                              ScheduledExecutorService executor) {
+                              ScheduledExecutorService executor,
+                              StorageServerClientManager clientManager) {
         this.store = store;
         this.executor = executor;
         this.rangePlacementPolicy = rangePlacementPolicy;
         this.streams = Maps.newHashMap();
+        this.clientManager = clientManager;
     }
 
     //
@@ -100,9 +104,10 @@ public class MetaRangeStoreImpl
             return metaRangeImpl.load(streamId)
                 .thenCompose(mr -> {
                     if (null == mr) {
-                        StreamProperties streamProps = request.hasStreamProps()
-                            ? request.getStreamProps() : null;
-                        return createStreamIfMissing(streamId, metaRangeImpl, streamProps);
+                        // meta range doesn't exist, talk to root range to get the stream props
+                        return clientManager.getStreamProperties(streamId)
+                            .thenCompose(streamProperties ->
+                                createStreamIfMissing(streamId, metaRangeImpl, streamProperties));
                     } else {
                         synchronized (streams) {
                             streams.put(streamId, (MetaRangeImpl) mr);
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
index 2f2c404..c066bd1 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
@@ -24,7 +24,6 @@ import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.validateName
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.validateStreamName;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import java.net.URI;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
@@ -145,16 +144,13 @@ public class RootRangeStoreImpl
         return streamIdBytes;
     }
 
-    private final URI defaultServiceUri;
     private final MVCCAsyncStore<byte[], byte[]> store;
     private final StorageContainerPlacementPolicy placementPolicy;
     private final ScheduledExecutorService executor;
 
-    public RootRangeStoreImpl(URI defaultServiceUri,
-                              MVCCAsyncStore<byte[], byte[]> store,
+    public RootRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
                               StorageContainerPlacementPolicy placementPolicy,
                               ScheduledExecutorService executor) {
-        this.defaultServiceUri = defaultServiceUri;
         this.store = store;
         this.placementPolicy = placementPolicy;
         this.executor = executor;
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
index 02dd8a3..937a9fa 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
@@ -18,7 +18,7 @@
 
 package org.apache.bookkeeper.stream.storage.impl.service;
 
-import java.net.URI;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.SharedResourceManager;
 import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
@@ -38,30 +38,29 @@ public class RangeStoreServiceFactoryImpl implements RangeStoreServiceFactory {
     private final Resource<OrderedScheduler> schedulerResource;
     private final OrderedScheduler scheduler;
     private final MVCCStoreFactory storeFactory;
-    private final URI defaultBackendUri;
+    private final StorageServerClientManager clientManager;
 
     public RangeStoreServiceFactoryImpl(StorageConfiguration storageConf,
                                         StorageContainerPlacementPolicy rangePlacementPolicy,
                                         Resource<OrderedScheduler> schedulerResource,
                                         MVCCStoreFactory storeFactory,
-                                        URI defaultBackendUri) {
+                                        StorageServerClientManager clientManager) {
         this.storageConf = storageConf;
         this.rangePlacementPolicy = rangePlacementPolicy;
         this.schedulerResource = schedulerResource;
         this.scheduler = SharedResourceManager.shared().get(schedulerResource);
         this.storeFactory = storeFactory;
-        this.defaultBackendUri = defaultBackendUri;
+        this.clientManager = clientManager;
     }
 
     @Override
     public RangeStoreService createService(long scId) {
         return new RangeStoreServiceImpl(
-            storageConf,
             scId,
             rangePlacementPolicy,
             scheduler,
             storeFactory,
-            defaultBackendUri);
+            clientManager);
     }
 
     @Override
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index 7adcefd..e3b2d39 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -25,12 +25,12 @@ import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORA
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STREAM_ID;
 
 import com.google.common.collect.Lists;
-import java.net.URI;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
@@ -64,7 +64,6 @@ import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
 import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreCache;
 import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreFactory;
 import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreImpl;
@@ -100,19 +99,18 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
     @Getter(value = AccessLevel.PACKAGE)
     private final TableStoreFactory tableStoreFactory;
 
-    RangeStoreServiceImpl(StorageConfiguration storageConf,
-                          long scId,
+    RangeStoreServiceImpl(long scId,
                           StorageContainerPlacementPolicy rangePlacementPolicy,
                           OrderedScheduler scheduler,
                           MVCCStoreFactory storeFactory,
-                          URI defaultBackendUri) {
+                          StorageServerClientManager clientManager) {
         this(
             scId,
             scheduler,
             storeFactory,
             store -> new RootRangeStoreImpl(
-                defaultBackendUri, store, rangePlacementPolicy, scheduler.chooseThread(scId)),
-            store -> new MetaRangeStoreImpl(store, rangePlacementPolicy, scheduler.chooseThread(scId)),
+                store, rangePlacementPolicy, scheduler.chooseThread(scId)),
+            store -> new MetaRangeStoreImpl(store, rangePlacementPolicy, scheduler.chooseThread(scId), clientManager),
             store -> new TableStoreImpl(store));
     }
 
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
index d0156c5..5e5f4b3 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
@@ -20,12 +20,16 @@ package org.apache.bookkeeper.stream.storage.impl.metadata;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.stream.LongStream;
+import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.stream.proto.RangeMetadata;
 import org.apache.bookkeeper.stream.proto.RangeState;
@@ -48,6 +52,7 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
 
     private StreamProperties streamProps;
     private MetaRangeStoreImpl mrStoreImpl;
+    private StorageServerClientManager clientManager;
 
     @Override
     protected void doSetup() throws Exception {
@@ -57,10 +62,12 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
             .setStreamName(name.getMethodName() + "_stream")
             .setStreamId(System.currentTimeMillis())
             .build();
+        this.clientManager = mock(StorageServerClientManager.class);
         this.mrStoreImpl = new MetaRangeStoreImpl(
             this.store,
             StorageContainerPlacementPolicyImpl.of(1024),
-            this.scheduler.chooseThread());
+            this.scheduler.chooseThread(),
+            clientManager);
     }
 
     @Override
@@ -68,11 +75,10 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
     }
 
     GetActiveRangesRequest createRequest(StreamProperties streamProperties) {
+        when(clientManager.getStreamProperties(eq(this.streamProps.getStreamId())))
+            .thenReturn(FutureUtils.value(streamProperties));
         GetActiveRangesRequest.Builder reqBuilder = GetActiveRangesRequest.newBuilder()
             .setStreamId(this.streamProps.getStreamId());
-        if (null != streamProperties) {
-            reqBuilder = reqBuilder.setStreamProps(streamProperties);
-        }
         return reqBuilder.build();
     }
 
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
index da3b6d1..9cdbf64 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
@@ -33,7 +33,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.net.URI;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -57,8 +56,6 @@ import org.junit.Test;
 @Slf4j
 public class TestRootRangeStoreImpl extends MVCCAsyncStoreTestBase {
 
-    private static final String DEFAULT_SERVICE_URI = "distributedlog://127.0.0.1/stream/storage";
-
     private final NamespaceConfiguration namespaceConf =
         NamespaceConfiguration.newBuilder()
             .setDefaultStreamConf(DEFAULT_STREAM_CONF)
@@ -74,7 +71,6 @@ public class TestRootRangeStoreImpl extends MVCCAsyncStoreTestBase {
     @Override
     protected void doSetup() throws Exception {
         rootRangeStore = new RootRangeStoreImpl(
-            URI.create(DEFAULT_SERVICE_URI),
             store,
             StorageContainerPlacementPolicyImpl.of(1024),
             scheduler.chooseThread());