You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/02 09:30:26 UTC

flink git commit: [FLINK-6557] [rocksdb] Use File instead of Path for RocksDB local temp directories.

Repository: flink
Updated Branches:
  refs/heads/master ecde6c328 -> 390d36132


[FLINK-6557] [rocksdb] Use File instead of Path for RocksDB local temp directories.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/390d3613
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/390d3613
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/390d3613

Branch: refs/heads/master
Commit: 390d36132927fe5a1bac1a53664c6c67ebe3e657
Parents: ecde6c3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 30 22:50:24 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:23:43 2018 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  69 ++++++---
 .../state/RocksDBStateBackendConfigTest.java    | 139 +++++++++++++++----
 2 files changed, 168 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/390d3613/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 9389295..81d6265 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -83,6 +83,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	/** The number of (re)tries for loading the RocksDB JNI library. */
 	private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
 
+	/** Flag whether the native library has been loaded. */
 	private static boolean rocksDbInitialized = false;
 
 	// ------------------------------------------------------------------------
@@ -96,7 +97,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	 * Null if not yet set, in which case the configuration values will be used.
 	 * The configuration defaults to the TaskManager's temp directories. */
 	@Nullable
-	private Path[] localRocksDbDirectories;
+	private File[] localRocksDbDirectories;
 
 	/** The pre-configured option settings. */
 	private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
@@ -169,6 +170,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	 * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
+	@SuppressWarnings("deprecation")
 	public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
 		this(new FsStateBackend(checkpointDataUri));
 	}
@@ -186,6 +188,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	 * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled.
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
+	@SuppressWarnings("deprecation")
 	public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException {
 		this(new FsStateBackend(checkpointDataUri), enableIncrementalCheckpointing);
 	}
@@ -326,16 +329,15 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		}
 		else {
 			List<File> dirs = new ArrayList<>(localRocksDbDirectories.length);
-			String errorMessage = "";
+			StringBuilder errorMessage = new StringBuilder();
 
-			for (Path path : localRocksDbDirectories) {
-				File f = new File(path.toUri().getPath());
+			for (File f : localRocksDbDirectories) {
 				File testDir = new File(f, UUID.randomUUID().toString());
 				if (!testDir.mkdirs()) {
-					String msg = "Local DB files directory '" + path
+					String msg = "Local DB files directory '" + f
 							+ "' does not exist and cannot be created. ";
 					LOG.error(msg);
-					errorMessage += msg;
+					errorMessage.append(msg);
 				} else {
 					dirs.add(f);
 				}
@@ -455,9 +457,13 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	}
 
 	/**
-	 * Sets the paths across which the local RocksDB database files are distributed on the local
-	 * file system. Setting these paths overrides the default behavior, where the
-	 * files are stored across the configured temp directories.
+	 * Sets the directories in which the local RocksDB database puts its files (like SST and
+	 * metadata files). These directories do not need to be persistent, they can be ephemeral,
+	 * meaning that they are lost on a machine failure, because state in RocksDB is persisted
+	 * in checkpoints.
+	 *
+	 * <p>If nothing is configured, these directories default to the TaskManager's local
+	 * temporary file directories.
 	 *
 	 * <p>Each distinct state will be stored in one path, but when the state backend creates
 	 * multiple states, they will store their files on different paths.
@@ -475,17 +481,41 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 			throw new IllegalArgumentException("empty paths");
 		}
 		else {
-			Path[] pp = new Path[paths.length];
+			File[] pp = new File[paths.length];
 
 			for (int i = 0; i < paths.length; i++) {
-				if (paths[i] == null) {
+				final String rawPath = paths[i];
+				final String path;
+
+				if (rawPath == null) {
 					throw new IllegalArgumentException("null path");
 				}
+				else {
+					// we need this for backwards compatibility, to allow URIs like 'file:///'...
+					URI uri = null;
+					try {
+						uri = new Path(rawPath).toUri();
+					}
+					catch (Exception e) {
+						// cannot parse as a path
+					}
 
-				pp[i] = new Path(paths[i]);
-				String scheme = pp[i].toUri().getScheme();
-				if (scheme != null && !scheme.equalsIgnoreCase("file")) {
-					throw new IllegalArgumentException("Path " + paths[i] + " has a non local scheme");
+					if (uri != null && uri.getScheme() != null) {
+						if ("file".equalsIgnoreCase(uri.getScheme())) {
+							path = uri.getPath();
+						}
+						else {
+							throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
+						}
+					}
+					else {
+						path = rawPath;
+					}
+				}
+
+				pp[i] = new File(path);
+				if (!pp[i].isAbsolute()) {
+					throw new IllegalArgumentException("Relative paths are not supported");
 				}
 			}
 
@@ -494,8 +524,15 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	}
 
 	/**
+	 * Gets the configured local DB storage paths, or null, if none were configured.
+	 *
+	 * <p>Under these directories on the TaskManager, RocksDB stores its SST files and
+	 * metadata files. These directories do not need to be persistent, they can be ephermeral,
+	 * meaning that they are lost on a machine failure, because state in RocksDB is persisted
+	 * in checkpoints.
 	 *
-	 * @return The configured DB storage paths, or null, if none were configured.
+	 * <p>If nothing is configured, these directories default to the TaskManager's local
+	 * temporary file directories.
 	 */
 	public String[] getDbStoragePaths() {
 		if (localRocksDbDirectories == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/390d3613/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 65d5b2e..4bc2f9f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -23,14 +23,17 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -67,10 +70,10 @@ import static org.mockito.Mockito.when;
 public class RocksDBStateBackendConfigTest {
 
 	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
 
 	// ------------------------------------------------------------------------
-	//  RocksDB local file directory
+	//  default values
 	// ------------------------------------------------------------------------
 
 	@Test
@@ -81,39 +84,99 @@ public class RocksDBStateBackendConfigTest {
 		assertEquals(defaultIncremental, backend.isIncrementalCheckpointsEnabled());
 	}
 
+	// ------------------------------------------------------------------------
+	//  RocksDB local file directory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This test checks the behavior for basic setting of local DB directories.
+	 */
 	@Test
 	public void testSetDbPath() throws Exception {
-		String checkpointPath = tempFolder.newFolder().toURI().toString();
-		File testDir1 = tempFolder.newFolder();
-		File testDir2 = tempFolder.newFolder();
+		final RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+		final String testDir1 = tempFolder.newFolder().getAbsolutePath();
+		final String testDir2 = tempFolder.newFolder().getAbsolutePath();
 
 		assertNull(rocksDbBackend.getDbStoragePaths());
 
-		rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath());
-		assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths());
+		rocksDbBackend.setDbStoragePath(testDir1);
+		assertArrayEquals(new String[] { testDir1 }, rocksDbBackend.getDbStoragePaths());
 
 		rocksDbBackend.setDbStoragePath(null);
 		assertNull(rocksDbBackend.getDbStoragePaths());
 
-		rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath());
-		assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString(), new Path(testDir2.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths());
+		rocksDbBackend.setDbStoragePaths(testDir1, testDir2);
+		assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths());
 
-		Environment env = getMockEnvironment();
-		RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
-				createKeyedStateBackend(
-						env,
-						env.getJobID(),
-						"test_op",
-						IntSerializer.INSTANCE,
-						1,
-						new KeyGroupRange(0, 0),
-						env.getTaskKvStateRegistry());
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend);
+
+		try {
+			File instanceBasePath = keyedBackend.getInstanceBasePath();
+			assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1), startsWith(testDir2)));
+
+			//noinspection NullArgumentToVariableArgMethod
+			rocksDbBackend.setDbStoragePaths(null);
+			assertNull(rocksDbBackend.getDbStoragePaths());
+		}
+		finally {
+			IOUtils.closeQuietly(keyedBackend);
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testStoragePathWithFilePrefix() throws Exception {
+		final File folder = tempFolder.newFolder();
+		final String dbStoragePath = new Path(folder.toURI().toString()).toString();
+
+		assertTrue(dbStoragePath.startsWith("file:"));
+
+		testLocalDbPaths(dbStoragePath, folder);
+	}
+
+	@Test
+	public void testWithDefaultFsSchemeNoStoragePath() throws Exception {
+		try {
+			// set the default file system scheme
+			Configuration config = new Configuration();
+			config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
+			FileSystem.initialize(config);
+
+			testLocalDbPaths(null, new File(CommonTestUtils.getTempDir()));
+		}
+		finally {
+			FileSystem.initialize(new Configuration());
+		}
+	}
+
+	@Test
+	public void testWithDefaultFsSchemeAbsoluteStoragePath() throws Exception {
+		final File folder = tempFolder.newFolder();
+		final String dbStoragePath = folder.getAbsolutePath();
+
+		try {
+			// set the default file system scheme
+			Configuration config = new Configuration();
+			config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
+			FileSystem.initialize(config);
+
+			testLocalDbPaths(dbStoragePath, folder);
+		}
+		finally {
+			FileSystem.initialize(new Configuration());
+		}
+	}
+
+	private void testLocalDbPaths(String configuredPath, File expectedPath) throws Exception {
+		final RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+		rocksDbBackend.setDbStoragePath(configuredPath);
+
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
-			assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
+			assertThat(instanceBasePath.getAbsolutePath(), startsWith(expectedPath.getAbsolutePath()));
 
 			//noinspection NullArgumentToVariableArgMethod
 			rocksDbBackend.setDbStoragePaths(null);
@@ -124,13 +187,19 @@ public class RocksDBStateBackendConfigTest {
 		}
 	}
 
+	/**
+	 * Validates that empty arguments for the local DB path are invalid.
+	 */
 	@Test(expected = IllegalArgumentException.class)
-	public void testSetNullPaths() throws Exception {
+	public void testSetEmptyPaths() throws Exception {
 		String checkpointPath = tempFolder.newFolder().toURI().toString();
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
 		rocksDbBackend.setDbStoragePaths();
 	}
 
+	/**
+	 * Validates that schemes other than 'file:/' are not allowed.
+	 */
 	@Test(expected = IllegalArgumentException.class)
 	public void testNonFileSchemePath() throws Exception {
 		String checkpointPath = tempFolder.newFolder().toURI().toString();
@@ -138,6 +207,12 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
 	}
 
+	@Test(expected = IllegalArgumentException.class)
+	public void testDbPathRelativePaths() throws Exception {
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+		rocksDbBackend.setDbStoragePath("relative/path");
+	}
+
 	// ------------------------------------------------------------------------
 	//  RocksDB local file automatic from temp directories
 	// ------------------------------------------------------------------------
@@ -381,7 +456,7 @@ public class RocksDBStateBackendConfigTest {
 
 	@Test
 	public void testCallsForwardedToNonPartitionedBackend() throws Exception {
-		AbstractStateBackend storageBackend = new MemoryStateBackend();
+		StateBackend storageBackend = new MemoryStateBackend();
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend);
 		assertEquals(storageBackend, rocksDbBackend.getCheckpointBackend());
 	}
@@ -390,6 +465,22 @@ public class RocksDBStateBackendConfigTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
+			RocksDBStateBackend rocksDbBackend) throws Exception {
+
+		final Environment env = getMockEnvironment();
+
+		return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+				createKeyedStateBackend(
+						env,
+						env.getJobID(),
+						"test_op",
+						IntSerializer.INSTANCE,
+						1,
+						new KeyGroupRange(0, 0),
+						env.getTaskKvStateRegistry());
+	}
+
 	static Environment getMockEnvironment() {
 		return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) });
 	}