You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2019/05/03 16:55:17 UTC
[bookkeeper] branch branch-4.9 updated: ISSUE #2020: close db
properly to avoid open RocksDB failure at the second time
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-4.9
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.9 by this push:
new fe0e0d8 ISSUE #2020: close db properly to avoid open RocksDB failure at the second time
fe0e0d8 is described below
commit fe0e0d88b4f36e25aebd81c5b3596cbcd2c2d21e
Author: Arvin <ar...@gmail.com>
AuthorDate: Tue Apr 2 00:27:42 2019 +0800
ISSUE #2020: close db properly to avoid open RocksDB failure at the second time
Descriptions of the changes in this PR:
### Motivation
If not releasing resources of failed/closed asyncStore, new creating of the same store identifier will fail, mainly caused by RocksDBException, like #2020 shows.
### Changes
add scStores to factory's instance variable at the `addstore` method of `MVCCStoreFactoryImpl` class;
release store when open fail;
Descriptions of the changes in this PR:
Master Issue: #2020
Reviewers: Jia Zhai <zh...@apache.org>, Sijie Guo <si...@apache.org>
This closes #2022 from ArvinDevel/issue2020, closes #2020
---
.../storage/impl/sc/StorageContainerRegistryImpl.java | 5 +++++
.../stream/storage/impl/store/MVCCStoreFactoryImpl.java | 16 +++++++++++++++-
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 160d9a8..4208b1a 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -101,6 +101,11 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
} else {
log.warn("Fail to de-register StorageContainer ('{}') when failed to start", scId, cause);
}
+ log.info("Release resources hold by StorageContainer ('{}') during de-register", scId);
+ newStorageContainer.stop().exceptionally(throwable -> {
+ log.error("Stop StorageContainer ('{}') fail during de-register", scId);
+ return null;
+ });
} else {
log.info("Successfully started registered StorageContainer ('{}').", scId);
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 6f01d4b..e3d0a55 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -130,6 +130,7 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores = stores.get(scId);
if (null == scStores) {
scStores = Maps.newHashMap();
+ stores.putIfAbsent(scId, scStores);
}
RangeId rid = RangeId.of(streamId, rangeId);
MVCCAsyncStore<byte[], byte[]> oldStore = scStores.get(rid);
@@ -207,7 +208,18 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
.isReadonly(serveReadOnlyTable)
.build();
- return store.init(spec).thenApply(ignored -> {
+ return store.init(spec).whenComplete((ignored, throwable) -> {
+ // since the store has not been added, so can't release its resources during close sc
+ if (null != throwable) {
+ log.info("Clearing resources hold by stream({})/range({}) at storage container ({}) ",
+ streamId, rangeId, scId);
+ store.closeAsync().whenComplete((i, t) -> {
+ if (null != t) {
+ log.error("Clear resources hold by {} fail", store.name());
+ }
+ });
+ }
+ }).thenApply(ignored -> {
log.info("Successfully initialize stream({})/range({}) at storage container ({})",
streamId, rangeId, scId);
addStore(scId, streamId, rangeId, store);
@@ -222,11 +234,13 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
scStores = stores.remove(scId);
}
if (null == scStores) {
+ log.info("scStores for {} on store factory is null, return directly", scId);
return FutureUtils.Void();
}
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
for (MVCCAsyncStore<byte[], byte[]> store : scStores.values()) {
+ log.info("Closing {} of sc {}", store.name(), scId);
closeFutures.add(store.closeAsync());
}