You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:23 UTC

[06/17] flink git commit: [hotfix] [rocksdb] Clean up RocksDB state backend code

[hotfix] [rocksdb] Clean up RocksDB state backend code

  - arrange variables to properly express configuration (client side) versus runtime (task manager side)
  - make all runtime-only fields properly transient
  - fix confusing variable name for local directories


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1931993b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1931993b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1931993b

Branch: refs/heads/master
Commit: 1931993bdc1d294a0eb9e1ad727f737cf64fe150
Parents: fa03e78
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 26 14:55:28 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 73 ++++++++++----------
 1 file changed, 37 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1931993b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 79771f3..a6552bc 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -36,9 +36,12 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.RocksDB;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -49,7 +52,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A State Backend that stores its state in {@code RocksDB}. This state backend can
@@ -76,42 +79,40 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	private static boolean rocksDbInitialized = false;
 
 	// ------------------------------------------------------------------------
-	//  Static configuration values
-	// ------------------------------------------------------------------------
+
+	// -- configuration values, set in the application / configuration
 
 	/** The state backend that we use for creating checkpoint streams. */
 	private final AbstractStateBackend checkpointStreamBackend;
 
-	/** Operator identifier that is used to uniquify the RocksDB storage path. */
-	private String operatorIdentifier;
-
-	/** JobID for uniquifying backup paths. */
-	private JobID jobId;
-
-	// DB storage directories
-
 	/** Base paths for RocksDB directory, as configured. May be null. */
-	private Path[] configuredDbBasePaths;
-
-	/** Base paths for RocksDB directory, as initialized. */
-	private File[] initializedDbBasePaths;
-
-	private int nextDirectory;
-
-	// RocksDB options
+	@Nullable
+	private Path[] localRocksDbDirectories;
 
 	/** The pre-configured option settings. */
 	private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
 
 	/** The options factory to create the RocksDB options in the cluster. */
+	@Nullable
 	private OptionsFactory optionsFactory;
 
-	/** Whether we already lazily initialized our local storage directories. */
-	private transient boolean isInitialized = false;
-
 	/** True if incremental checkpointing is enabled. */
 	private boolean enableIncrementalCheckpointing;
 
+	// -- runtime values, set on TaskManager when initializing / using the backend
+
+	/** Base paths for RocksDB directory, as initialized. */
+	private transient File[] initializedDbBasePaths;
+
+	/** JobID for uniquifying backup paths. */
+	private transient JobID jobId;
+
+	private transient int nextDirectory;
+	
+	/** Whether we already lazily initialized our local storage directories. */
+	private transient boolean isInitialized;
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
@@ -190,7 +191,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * @param checkpointStreamBackend The backend to store the
 	 */
 	public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
-		this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
+		this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
 	}
 
 	/**
@@ -202,10 +203,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
 	 *
 	 * @param checkpointStreamBackend The backend to store the
-	 * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled
+	 * @param enableIncrementalCheckpointing True if incremental checkponting is enabled
 	 */
 	public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
-		this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
+		this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
 		this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
 	}
 
@@ -221,19 +222,18 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			return;
 		}
 
-		this.operatorIdentifier = operatorIdentifier.replace(" ", "");
 		this.jobId = env.getJobID();
 
 		// initialize the paths where the local RocksDB files should be stored
-		if (configuredDbBasePaths == null) {
+		if (localRocksDbDirectories == null) {
 			// initialize from the temp directories
 			initializedDbBasePaths = env.getIOManager().getSpillingDirectories();
 		}
 		else {
-			List<File> dirs = new ArrayList<>(configuredDbBasePaths.length);
+			List<File> dirs = new ArrayList<>(localRocksDbDirectories.length);
 			String errorMessage = "";
 
-			for (Path path : configuredDbBasePaths) {
+			for (Path path : localRocksDbDirectories) {
 				File f = new File(path.toUri().getPath());
 				File testDir = new File(f, UUID.randomUUID().toString());
 				if (!testDir.mkdirs()) {
@@ -244,6 +244,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 				} else {
 					dirs.add(f);
 				}
+				//noinspection ResultOfMethodCallIgnored
 				testDir.delete();
 			}
 
@@ -349,7 +350,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 */
 	public void setDbStoragePaths(String... paths) {
 		if (paths == null) {
-			configuredDbBasePaths = null;
+			localRocksDbDirectories = null;
 		}
 		else if (paths.length == 0) {
 			throw new IllegalArgumentException("empty paths");
@@ -369,7 +370,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 				}
 			}
 
-			configuredDbBasePaths = pp;
+			localRocksDbDirectories = pp;
 		}
 	}
 
@@ -378,12 +379,12 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * @return The configured DB storage paths, or null, if none were configured.
 	 */
 	public String[] getDbStoragePaths() {
-		if (configuredDbBasePaths == null) {
+		if (localRocksDbDirectories == null) {
 			return null;
 		} else {
-			String[] paths = new String[configuredDbBasePaths.length];
+			String[] paths = new String[localRocksDbDirectories.length];
 			for (int i = 0; i < paths.length; i++) {
-				paths[i] = configuredDbBasePaths[i].toString();
+				paths[i] = localRocksDbDirectories[i].toString();
 			}
 			return paths;
 		}
@@ -403,7 +404,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * @param options The options to set (must not be null).
 	 */
 	public void setPredefinedOptions(PredefinedOptions options) {
-		predefinedOptions = requireNonNull(options);
+		predefinedOptions = checkNotNull(options);
 	}
 
 	/**
@@ -496,7 +497,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	public String toString() {
 		return "RocksDB State Backend {" +
 			"isInitialized=" + isInitialized +
-			", configuredDbBasePaths=" + Arrays.toString(configuredDbBasePaths) +
+			", configuredDbBasePaths=" + Arrays.toString(localRocksDbDirectories) +
 			", initializedDbBasePaths=" + Arrays.toString(initializedDbBasePaths) +
 			", checkpointStreamBackend=" + checkpointStreamBackend +
 			'}';