You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/02 09:30:26 UTC
flink git commit: [FLINK-6557] [rocksdb] Use File instead of Path for
RocksDB local temp directories.
Repository: flink
Updated Branches:
refs/heads/master ecde6c328 -> 390d36132
[FLINK-6557] [rocksdb] Use File instead of Path for RocksDB local temp directories.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/390d3613
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/390d3613
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/390d3613
Branch: refs/heads/master
Commit: 390d36132927fe5a1bac1a53664c6c67ebe3e657
Parents: ecde6c3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 30 22:50:24 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:23:43 2018 +0200
----------------------------------------------------------------------
.../streaming/state/RocksDBStateBackend.java | 69 ++++++---
.../state/RocksDBStateBackendConfigTest.java | 139 +++++++++++++++----
2 files changed, 168 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/390d3613/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 9389295..81d6265 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -83,6 +83,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
/** The number of (re)tries for loading the RocksDB JNI library. */
private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
+ /** Flag whether the native library has been loaded. */
private static boolean rocksDbInitialized = false;
// ------------------------------------------------------------------------
@@ -96,7 +97,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
* Null if not yet set, in which case the configuration values will be used.
* The configuration defaults to the TaskManager's temp directories. */
@Nullable
- private Path[] localRocksDbDirectories;
+ private File[] localRocksDbDirectories;
/** The pre-configured option settings. */
private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
@@ -169,6 +170,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
* @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
+ @SuppressWarnings("deprecation")
public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
this(new FsStateBackend(checkpointDataUri));
}
@@ -186,6 +188,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
* @param enableIncrementalCheckpointing True if incremental checkpointing is enabled.
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
+ @SuppressWarnings("deprecation")
public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException {
this(new FsStateBackend(checkpointDataUri), enableIncrementalCheckpointing);
}
@@ -326,16 +329,15 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
}
else {
List<File> dirs = new ArrayList<>(localRocksDbDirectories.length);
- String errorMessage = "";
+ StringBuilder errorMessage = new StringBuilder();
- for (Path path : localRocksDbDirectories) {
- File f = new File(path.toUri().getPath());
+ for (File f : localRocksDbDirectories) {
File testDir = new File(f, UUID.randomUUID().toString());
if (!testDir.mkdirs()) {
- String msg = "Local DB files directory '" + path
+ String msg = "Local DB files directory '" + f
+ "' does not exist and cannot be created. ";
LOG.error(msg);
- errorMessage += msg;
+ errorMessage.append(msg);
} else {
dirs.add(f);
}
@@ -455,9 +457,13 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
}
/**
- * Sets the paths across which the local RocksDB database files are distributed on the local
- * file system. Setting these paths overrides the default behavior, where the
- * files are stored across the configured temp directories.
+ * Sets the directories in which the local RocksDB database puts its files (like SST and
+ * metadata files). These directories do not need to be persistent, they can be ephemeral,
+ * meaning that they are lost on a machine failure, because state in RocksDB is persisted
+ * in checkpoints.
+ *
+ * <p>If nothing is configured, these directories default to the TaskManager's local
+ * temporary file directories.
*
* <p>Each distinct state will be stored in one path, but when the state backend creates
* multiple states, they will store their files on different paths.
@@ -475,17 +481,41 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
throw new IllegalArgumentException("empty paths");
}
else {
- Path[] pp = new Path[paths.length];
+ File[] pp = new File[paths.length];
for (int i = 0; i < paths.length; i++) {
- if (paths[i] == null) {
+ final String rawPath = paths[i];
+ final String path;
+
+ if (rawPath == null) {
throw new IllegalArgumentException("null path");
}
+ else {
+ // we need this for backwards compatibility, to allow URIs like 'file:///'...
+ URI uri = null;
+ try {
+ uri = new Path(rawPath).toUri();
+ }
+ catch (Exception e) {
+ // cannot parse as a path
+ }
- pp[i] = new Path(paths[i]);
- String scheme = pp[i].toUri().getScheme();
- if (scheme != null && !scheme.equalsIgnoreCase("file")) {
- throw new IllegalArgumentException("Path " + paths[i] + " has a non local scheme");
+ if (uri != null && uri.getScheme() != null) {
+ if ("file".equalsIgnoreCase(uri.getScheme())) {
+ path = uri.getPath();
+ }
+ else {
+ throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
+ }
+ }
+ else {
+ path = rawPath;
+ }
+ }
+
+ pp[i] = new File(path);
+ if (!pp[i].isAbsolute()) {
+ throw new IllegalArgumentException("Relative paths are not supported");
}
}
@@ -494,8 +524,15 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
}
/**
+ * Gets the configured local DB storage paths, or null, if none were configured.
+ *
+ * <p>Under these directories on the TaskManager, RocksDB stores its SST files and
+ * metadata files. These directories do not need to be persistent, they can be ephermeral,
+ * meaning that they are lost on a machine failure, because state in RocksDB is persisted
+ * in checkpoints.
*
- * @return The configured DB storage paths, or null, if none were configured.
+ * <p>If nothing is configured, these directories default to the TaskManager's local
+ * temporary file directories.
*/
public String[] getDbStoragePaths() {
if (localRocksDbDirectories == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/390d3613/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 65d5b2e..4bc2f9f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -23,14 +23,17 @@ import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -67,10 +70,10 @@ import static org.mockito.Mockito.when;
public class RocksDBStateBackendConfigTest {
@Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
// ------------------------------------------------------------------------
- // RocksDB local file directory
+ // default values
// ------------------------------------------------------------------------
@Test
@@ -81,39 +84,99 @@ public class RocksDBStateBackendConfigTest {
assertEquals(defaultIncremental, backend.isIncrementalCheckpointsEnabled());
}
+ // ------------------------------------------------------------------------
+ // RocksDB local file directory
+ // ------------------------------------------------------------------------
+
+ /**
+ * This test checks the behavior for basic setting of local DB directories.
+ */
@Test
public void testSetDbPath() throws Exception {
- String checkpointPath = tempFolder.newFolder().toURI().toString();
- File testDir1 = tempFolder.newFolder();
- File testDir2 = tempFolder.newFolder();
+ final RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
- RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+ final String testDir1 = tempFolder.newFolder().getAbsolutePath();
+ final String testDir2 = tempFolder.newFolder().getAbsolutePath();
assertNull(rocksDbBackend.getDbStoragePaths());
- rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath());
- assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths());
+ rocksDbBackend.setDbStoragePath(testDir1);
+ assertArrayEquals(new String[] { testDir1 }, rocksDbBackend.getDbStoragePaths());
rocksDbBackend.setDbStoragePath(null);
assertNull(rocksDbBackend.getDbStoragePaths());
- rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath());
- assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString(), new Path(testDir2.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths());
+ rocksDbBackend.setDbStoragePaths(testDir1, testDir2);
+ assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths());
- Environment env = getMockEnvironment();
- RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
- createKeyedStateBackend(
- env,
- env.getJobID(),
- "test_op",
- IntSerializer.INSTANCE,
- 1,
- new KeyGroupRange(0, 0),
- env.getTaskKvStateRegistry());
+ final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend);
+
+ try {
+ File instanceBasePath = keyedBackend.getInstanceBasePath();
+ assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1), startsWith(testDir2)));
+
+ //noinspection NullArgumentToVariableArgMethod
+ rocksDbBackend.setDbStoragePaths(null);
+ assertNull(rocksDbBackend.getDbStoragePaths());
+ }
+ finally {
+ IOUtils.closeQuietly(keyedBackend);
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testStoragePathWithFilePrefix() throws Exception {
+ final File folder = tempFolder.newFolder();
+ final String dbStoragePath = new Path(folder.toURI().toString()).toString();
+
+ assertTrue(dbStoragePath.startsWith("file:"));
+
+ testLocalDbPaths(dbStoragePath, folder);
+ }
+
+ @Test
+ public void testWithDefaultFsSchemeNoStoragePath() throws Exception {
+ try {
+ // set the default file system scheme
+ Configuration config = new Configuration();
+ config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
+ FileSystem.initialize(config);
+
+ testLocalDbPaths(null, new File(CommonTestUtils.getTempDir()));
+ }
+ finally {
+ FileSystem.initialize(new Configuration());
+ }
+ }
+
+ @Test
+ public void testWithDefaultFsSchemeAbsoluteStoragePath() throws Exception {
+ final File folder = tempFolder.newFolder();
+ final String dbStoragePath = folder.getAbsolutePath();
+
+ try {
+ // set the default file system scheme
+ Configuration config = new Configuration();
+ config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
+ FileSystem.initialize(config);
+
+ testLocalDbPaths(dbStoragePath, folder);
+ }
+ finally {
+ FileSystem.initialize(new Configuration());
+ }
+ }
+
+ private void testLocalDbPaths(String configuredPath, File expectedPath) throws Exception {
+ final RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+ rocksDbBackend.setDbStoragePath(configuredPath);
+
+ RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend);
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
- assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
+ assertThat(instanceBasePath.getAbsolutePath(), startsWith(expectedPath.getAbsolutePath()));
//noinspection NullArgumentToVariableArgMethod
rocksDbBackend.setDbStoragePaths(null);
@@ -124,13 +187,19 @@ public class RocksDBStateBackendConfigTest {
}
}
+ /**
+ * Validates that empty arguments for the local DB path are invalid.
+ */
@Test(expected = IllegalArgumentException.class)
- public void testSetNullPaths() throws Exception {
+ public void testSetEmptyPaths() throws Exception {
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
rocksDbBackend.setDbStoragePaths();
}
+ /**
+ * Validates that schemes other than 'file:/' are not allowed.
+ */
@Test(expected = IllegalArgumentException.class)
public void testNonFileSchemePath() throws Exception {
String checkpointPath = tempFolder.newFolder().toURI().toString();
@@ -138,6 +207,12 @@ public class RocksDBStateBackendConfigTest {
rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testDbPathRelativePaths() throws Exception {
+ RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+ rocksDbBackend.setDbStoragePath("relative/path");
+ }
+
// ------------------------------------------------------------------------
// RocksDB local file automatic from temp directories
// ------------------------------------------------------------------------
@@ -381,7 +456,7 @@ public class RocksDBStateBackendConfigTest {
@Test
public void testCallsForwardedToNonPartitionedBackend() throws Exception {
- AbstractStateBackend storageBackend = new MemoryStateBackend();
+ StateBackend storageBackend = new MemoryStateBackend();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend);
assertEquals(storageBackend, rocksDbBackend.getCheckpointBackend());
}
@@ -390,6 +465,22 @@ public class RocksDBStateBackendConfigTest {
// Utilities
// ------------------------------------------------------------------------
+ static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
+ RocksDBStateBackend rocksDbBackend) throws Exception {
+
+ final Environment env = getMockEnvironment();
+
+ return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+ createKeyedStateBackend(
+ env,
+ env.getJobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ env.getTaskKvStateRegistry());
+ }
+
static Environment getMockEnvironment() {
return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) });
}