You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:23 UTC
[06/17] flink git commit: [hotfix] [rocksdb] Clean up RocksDB state
backend code
[hotfix] [rocksdb] Clean up RocksDB state backend code
- arrange variables to properly express configuration (client side) versus runtime (task manager side)
- make all runtime-only fields properly transient
- fix confusing variable name for local directories
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1931993b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1931993b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1931993b
Branch: refs/heads/master
Commit: 1931993bdc1d294a0eb9e1ad727f737cf64fe150
Parents: fa03e78
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 26 14:55:28 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBStateBackend.java | 73 ++++++++++----------
1 file changed, 37 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1931993b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 79771f3..a6552bc 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -36,9 +36,12 @@ import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -49,7 +52,7 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A State Backend that stores its state in {@code RocksDB}. This state backend can
@@ -76,42 +79,40 @@ public class RocksDBStateBackend extends AbstractStateBackend {
private static boolean rocksDbInitialized = false;
// ------------------------------------------------------------------------
- // Static configuration values
- // ------------------------------------------------------------------------
+
+ // -- configuration values, set in the application / configuration
/** The state backend that we use for creating checkpoint streams. */
private final AbstractStateBackend checkpointStreamBackend;
- /** Operator identifier that is used to uniquify the RocksDB storage path. */
- private String operatorIdentifier;
-
- /** JobID for uniquifying backup paths. */
- private JobID jobId;
-
- // DB storage directories
-
/** Base paths for RocksDB directory, as configured. May be null. */
- private Path[] configuredDbBasePaths;
-
- /** Base paths for RocksDB directory, as initialized. */
- private File[] initializedDbBasePaths;
-
- private int nextDirectory;
-
- // RocksDB options
+ @Nullable
+ private Path[] localRocksDbDirectories;
/** The pre-configured option settings. */
private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
/** The options factory to create the RocksDB options in the cluster. */
+ @Nullable
private OptionsFactory optionsFactory;
- /** Whether we already lazily initialized our local storage directories. */
- private transient boolean isInitialized = false;
-
/** True if incremental checkpointing is enabled. */
private boolean enableIncrementalCheckpointing;
+ // -- runtime values, set on TaskManager when initializing / using the backend
+
+ /** Base paths for RocksDB directory, as initialized. */
+ private transient File[] initializedDbBasePaths;
+
+ /** JobID for uniquifying backup paths. */
+ private transient JobID jobId;
+
+ private transient int nextDirectory;
+
+ /** Whether we already lazily initialized our local storage directories. */
+ private transient boolean isInitialized;
+
+ // ------------------------------------------------------------------------
/**
* Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
@@ -190,7 +191,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
* @param checkpointStreamBackend The backend to store the
*/
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
- this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
+ this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
}
/**
@@ -202,10 +203,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
* {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
*
* @param checkpointStreamBackend The backend to store the
- * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled
+ * @param enableIncrementalCheckpointing True if incremental checkponting is enabled
*/
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
- this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
+ this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
}
@@ -221,19 +222,18 @@ public class RocksDBStateBackend extends AbstractStateBackend {
return;
}
- this.operatorIdentifier = operatorIdentifier.replace(" ", "");
this.jobId = env.getJobID();
// initialize the paths where the local RocksDB files should be stored
- if (configuredDbBasePaths == null) {
+ if (localRocksDbDirectories == null) {
// initialize from the temp directories
initializedDbBasePaths = env.getIOManager().getSpillingDirectories();
}
else {
- List<File> dirs = new ArrayList<>(configuredDbBasePaths.length);
+ List<File> dirs = new ArrayList<>(localRocksDbDirectories.length);
String errorMessage = "";
- for (Path path : configuredDbBasePaths) {
+ for (Path path : localRocksDbDirectories) {
File f = new File(path.toUri().getPath());
File testDir = new File(f, UUID.randomUUID().toString());
if (!testDir.mkdirs()) {
@@ -244,6 +244,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
} else {
dirs.add(f);
}
+ //noinspection ResultOfMethodCallIgnored
testDir.delete();
}
@@ -349,7 +350,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
*/
public void setDbStoragePaths(String... paths) {
if (paths == null) {
- configuredDbBasePaths = null;
+ localRocksDbDirectories = null;
}
else if (paths.length == 0) {
throw new IllegalArgumentException("empty paths");
@@ -369,7 +370,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
}
- configuredDbBasePaths = pp;
+ localRocksDbDirectories = pp;
}
}
@@ -378,12 +379,12 @@ public class RocksDBStateBackend extends AbstractStateBackend {
* @return The configured DB storage paths, or null, if none were configured.
*/
public String[] getDbStoragePaths() {
- if (configuredDbBasePaths == null) {
+ if (localRocksDbDirectories == null) {
return null;
} else {
- String[] paths = new String[configuredDbBasePaths.length];
+ String[] paths = new String[localRocksDbDirectories.length];
for (int i = 0; i < paths.length; i++) {
- paths[i] = configuredDbBasePaths[i].toString();
+ paths[i] = localRocksDbDirectories[i].toString();
}
return paths;
}
@@ -403,7 +404,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
* @param options The options to set (must not be null).
*/
public void setPredefinedOptions(PredefinedOptions options) {
- predefinedOptions = requireNonNull(options);
+ predefinedOptions = checkNotNull(options);
}
/**
@@ -496,7 +497,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
public String toString() {
return "RocksDB State Backend {" +
"isInitialized=" + isInitialized +
- ", configuredDbBasePaths=" + Arrays.toString(configuredDbBasePaths) +
+ ", configuredDbBasePaths=" + Arrays.toString(localRocksDbDirectories) +
", initializedDbBasePaths=" + Arrays.toString(initializedDbBasePaths) +
", checkpointStreamBackend=" + checkpointStreamBackend +
'}';