You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/04/01 16:27:46 UTC

[bookkeeper] branch master updated: ISSUE #2020: close db properly to avoid open RocksDB failure at the second time

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 751e55f  ISSUE #2020: close db properly to avoid open RocksDB failure at the second time
751e55f is described below

commit 751e55fa433172422fb82556ff748c5f6f2bfc41
Author: Arvin <ar...@gmail.com>
AuthorDate: Tue Apr 2 00:27:42 2019 +0800

    ISSUE #2020: close db properly to avoid open RocksDB failure at the second time
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    If not releasing resources of failed/closed asyncStore, new creating of the same store identifier will fail, mainly caused by RocksDBException, like #2020 shows.
    
    ### Changes
    
    add scStores to factory's instance variable at the `addstore` method of `MVCCStoreFactoryImpl` class;
    release store when open fail;
    
    Descriptions of the changes in this PR:
    
    Master Issue: #2020
    
    
    
    Reviewers: Jia Zhai <zh...@apache.org>, Sijie Guo <si...@apache.org>
    
    This closes #2022 from ArvinDevel/issue2020, closes #2020
---
 .../storage/impl/sc/StorageContainerRegistryImpl.java    |  5 +++++
 .../stream/storage/impl/store/MVCCStoreFactoryImpl.java  | 16 +++++++++++++++-
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 160d9a8..4208b1a 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -101,6 +101,11 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
                     } else {
                         log.warn("Fail to de-register StorageContainer ('{}') when failed to start", scId, cause);
                     }
+                    log.info("Release resources hold by StorageContainer ('{}') during de-register", scId);
+                    newStorageContainer.stop().exceptionally(throwable -> {
+                        log.error("Stop StorageContainer ('{}') fail during de-register", scId);
+                        return null;
+                    });
                 } else {
                     log.info("Successfully started registered StorageContainer ('{}').", scId);
                 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 6f01d4b..e3d0a55 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -130,6 +130,7 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
         Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores = stores.get(scId);
         if (null == scStores) {
             scStores = Maps.newHashMap();
+            stores.putIfAbsent(scId, scStores);
         }
         RangeId rid = RangeId.of(streamId, rangeId);
         MVCCAsyncStore<byte[], byte[]> oldStore = scStores.get(rid);
@@ -207,7 +208,18 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
             .isReadonly(serveReadOnlyTable)
             .build();
 
-        return store.init(spec).thenApply(ignored -> {
+        return store.init(spec).whenComplete((ignored, throwable) -> {
+            // since the store has not been added, so can't release its resources during close sc
+            if (null != throwable) {
+                log.info("Clearing resources hold by stream({})/range({}) at storage container ({}) ",
+                    streamId, rangeId, scId);
+                store.closeAsync().whenComplete((i, t) -> {
+                    if (null != t) {
+                        log.error("Clear resources hold by {} fail", store.name());
+                    }
+                });
+            }
+        }).thenApply(ignored -> {
             log.info("Successfully initialize stream({})/range({}) at storage container ({})",
                 streamId, rangeId, scId);
             addStore(scId, streamId, rangeId, store);
@@ -222,11 +234,13 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
             scStores = stores.remove(scId);
         }
         if (null == scStores) {
+            log.info("scStores for {} on store factory is null, return directly", scId);
             return FutureUtils.Void();
         }
 
         List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
         for (MVCCAsyncStore<byte[], byte[]> store : scStores.values()) {
+            log.info("Closing {} of sc {}", store.name(), scId);
             closeFutures.add(store.closeAsync());
         }