You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/11/23 11:40:15 UTC
flink git commit: [FLIN-6505] Proactively cleanup local FS for
RocksDBKeyedStateBackend on startup
Repository: flink
Updated Branches:
refs/heads/master 200612ee0 -> ccf917de2
[FLIN-6505] Proactively cleanup local FS for RocksDBKeyedStateBackend on startup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf917de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf917de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf917de
Branch: refs/heads/master
Commit: ccf917de23ac94b032da11fb536d778f0566792f
Parents: 200612e
Author: Bowen Li <bo...@gmail.com>
Authored: Mon Oct 9 22:31:17 2017 -0700
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Nov 23 12:38:58 2017 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 28 ++++++++++----------
1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ccf917de/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f67daab..9185ad0 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -235,20 +235,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
this.instanceRocksDBPath = new File(instanceBasePath, "db");
- if (!instanceBasePath.exists()) {
- if (!instanceBasePath.mkdirs()) {
- throw new IOException("Could not create RocksDB data directory.");
- }
+ if (instanceBasePath.exists()) {
+ // Clear the base directory when the backend is created
+ // in case something crashed and the backend never reached dispose()
+ cleanInstanceBasePath();
}
- // clean it, this will remove the last part of the path but RocksDB will recreate it
- try {
- if (instanceRocksDBPath.exists()) {
- LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
- FileUtils.deleteDirectory(instanceRocksDBPath);
- }
- } catch (IOException e) {
- throw new IOException("Error cleaning RocksDB data directory.", e);
+ if (!instanceBasePath.mkdirs()) {
+ throw new IOException("Could not create RocksDB data directory.");
}
this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
@@ -312,10 +306,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
IOUtils.closeQuietly(dbOptions);
IOUtils.closeQuietly(columnOptions);
+ cleanInstanceBasePath();
+ }
+
+ private void cleanInstanceBasePath() {
+ LOG.info("Deleting existing instance base directory {}.", instanceBasePath);
+
try {
FileUtils.deleteDirectory(instanceBasePath);
- } catch (IOException ioex) {
- LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath, ioex);
+ } catch (IOException ex) {
+ LOG.warn("Could not delete instance base path for RocksDB: " + instanceBasePath, ex);
}
}