You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/02/25 03:38:55 UTC
[flink] branch release-1.10 updated: [FLINK-10918][state backends]
Change o.a.f.core.fs.Path to java.nio.file.Path in RocksDB incremental
checkpoints
This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 3993292 [FLINK-10918][state backends] Change o.a.f.core.fs.Path to java.nio.file.Path in RocksDB incremental checkpoints
3993292 is described below
commit 399329275e5e2baca9ed9494cce97ff732ac077a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 13 20:50:07 2020 +0100
[FLINK-10918][state backends] Change o.a.f.core.fs.Path to java.nio.file.Path in RocksDB incremental checkpoints
This solves the bug that Paths are handled wrong on Windows, breaking incremental checkpoints.
This closs #11095
---
.../main/java/org/apache/flink/util/FileUtils.java | 12 ++++++
.../flink/runtime/state/DirectoryStateHandle.java | 31 +++++++++-----
.../flink/runtime/state/SnapshotDirectory.java | 42 ++++++++-----------
.../flink/runtime/state/SnapshotDirectoryTest.java | 48 +++++-----------------
.../state/StateSnapshotTransformerTest.java | 2 +-
.../streaming/state/RocksDBStateDownloader.java | 14 +++----
.../streaming/state/RocksDBStateUploader.java | 12 +++---
.../RocksDBIncrementalRestoreOperation.java | 46 ++++++++-------------
.../snapshot/RocksIncrementalSnapshotStrategy.java | 20 ++++-----
.../state/RocksDBStateDownloaderTest.java | 8 ++--
.../streaming/state/RocksDBStateUploaderTest.java | 13 +++---
11 files changed, 109 insertions(+), 139 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 485eb02..be3e68d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -51,6 +51,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.function.Predicate;
+import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
@@ -100,6 +101,17 @@ public final class FileUtils {
// ------------------------------------------------------------------------
/**
+ * Lists the given directory in a resource-leak-safe way.
+ */
+ public static java.nio.file.Path[] listDirectory(java.nio.file.Path directory) throws IOException {
+ try (Stream<java.nio.file.Path> stream = Files.list(directory)) {
+ return stream.toArray(java.nio.file.Path[]::new);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
* Constructs a random filename with the given prefix and
* a random part generated from hex characters.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
index 2d08777..f57dc9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
@@ -18,12 +18,13 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
import javax.annotation.Nonnull;
import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
/**
* This state handle represents a directory. This class is, for example, used to represent the directory of RocksDB's
@@ -34,18 +35,21 @@ public class DirectoryStateHandle implements StateObject {
/** Serial version. */
private static final long serialVersionUID = 1L;
- /** The path that describes the directory. */
- @Nonnull
- private final Path directory;
+ /** The path that describes the directory, as a string, to be serializable. */
+ private final String directoryString;
+
+ /** Transient path cache, to avoid re-parsing the string. */
+ private transient Path directory;
public DirectoryStateHandle(@Nonnull Path directory) {
this.directory = directory;
+ this.directoryString = directory.toString();
}
@Override
public void discardState() throws IOException {
- FileSystem fileSystem = directory.getFileSystem();
- fileSystem.delete(directory, true);
+ ensurePath();
+ FileUtils.deleteDirectory(directory.toFile());
}
@Override
@@ -56,9 +60,16 @@ public class DirectoryStateHandle implements StateObject {
@Nonnull
public Path getDirectory() {
+ ensurePath();
return directory;
}
+ private void ensurePath() {
+ if (directory == null) {
+ directory = Paths.get(directoryString);
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -70,18 +81,18 @@ public class DirectoryStateHandle implements StateObject {
DirectoryStateHandle that = (DirectoryStateHandle) o;
- return directory.equals(that.directory);
+ return directoryString.equals(that.directoryString);
}
@Override
public int hashCode() {
- return directory.hashCode();
+ return directoryString.hashCode();
}
@Override
public String toString() {
return "DirectoryStateHandle{" +
- "directory=" + directory +
+ "directory=" + directoryString +
'}';
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
index 19e4b52..5809daf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
@@ -18,15 +18,15 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -49,50 +49,37 @@ public abstract class SnapshotDirectory {
@Nonnull
protected final Path directory;
- /** The filesystem that contains the snapshot directory. */
- @Nonnull
- protected final FileSystem fileSystem;
-
/** This reference tracks the lifecycle state of the snapshot directory. */
@Nonnull
protected AtomicReference<State> state;
- private SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem fileSystem) {
+ private SnapshotDirectory(@Nonnull Path directory) {
this.directory = directory;
- this.fileSystem = fileSystem;
this.state = new AtomicReference<>(State.ONGOING);
}
- private SnapshotDirectory(@Nonnull Path directory) throws IOException {
- this(directory, directory.getFileSystem());
- }
-
@Nonnull
public Path getDirectory() {
return directory;
}
public boolean mkdirs() throws IOException {
- return fileSystem.mkdirs(directory);
- }
-
- @Nonnull
- public FileSystem getFileSystem() {
- return fileSystem;
+ Files.createDirectories(directory);
+ return true;
}
public boolean exists() throws IOException {
- return fileSystem.exists(directory);
+ return Files.exists(directory);
}
/**
- * List the statuses of the files/directories in the snapshot directory.
+ * List the files in the snapshot directory.
*
- * @return the statuses of the files/directories in the given path.
+ * @return the files in the snapshot directory.
* @throws IOException if there is a problem creating the file statuses.
*/
- public FileStatus[] listStatus() throws IOException {
- return fileSystem.listStatus(directory);
+ public Path[] listDirectory() throws IOException {
+ return FileUtils.listDirectory(directory);
}
/**
@@ -103,7 +90,10 @@ public abstract class SnapshotDirectory {
* @throws IOException if an exception happens during the delete.
*/
public boolean cleanup() throws IOException {
- return !state.compareAndSet(State.ONGOING, State.DELETED) || fileSystem.delete(directory, true);
+ if (state.compareAndSet(State.ONGOING, State.DELETED)) {
+ FileUtils.deleteDirectory(directory.toFile());
+ }
+ return true;
}
/**
@@ -174,7 +164,7 @@ public abstract class SnapshotDirectory {
private static class TemporarySnapshotDirectory extends SnapshotDirectory {
TemporarySnapshotDirectory(@Nonnull File directory) throws IOException {
- super(new Path(directory.toURI()), FileSystem.getLocalFileSystem());
+ super(directory.toPath());
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
index 392e844..67ba641 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
@@ -18,10 +18,7 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -32,6 +29,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.Arrays;
import java.util.UUID;
@@ -61,7 +59,7 @@ public class SnapshotDirectoryTest extends TestLogger {
File folderRoot = temporaryFolder.getRoot();
File newFolder = new File(folderRoot, String.valueOf(UUID.randomUUID()));
File innerNewFolder = new File(newFolder, String.valueOf(UUID.randomUUID()));
- Path path = new Path(innerNewFolder.toURI());
+ Path path = innerNewFolder.toPath();
Assert.assertFalse(newFolder.isDirectory());
Assert.assertFalse(innerNewFolder.isDirectory());
@@ -85,7 +83,7 @@ public class SnapshotDirectoryTest extends TestLogger {
File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));
Assert.assertFalse(folderA.isDirectory());
- Path path = new Path(folderA.toURI());
+ Path path = folderA.toPath();
SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path);
Assert.assertFalse(snapshotDirectory.exists());
Assert.assertTrue(folderA.mkdirs());
@@ -106,14 +104,13 @@ public class SnapshotDirectoryTest extends TestLogger {
File file = new File(folderA, "test.txt");
Assert.assertTrue(file.createNewFile());
- Path path = new Path(folderA.toURI());
- FileSystem fileSystem = path.getFileSystem();
+ Path path = folderA.toPath();
SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path);
Assert.assertTrue(snapshotDirectory.exists());
Assert.assertEquals(
- Arrays.toString(fileSystem.listStatus(path)),
- Arrays.toString(snapshotDirectory.listStatus()));
+ Arrays.toString(FileUtils.listDirectory(path)),
+ Arrays.toString(snapshotDirectory.listDirectory()));
}
/**
@@ -125,7 +122,7 @@ public class SnapshotDirectoryTest extends TestLogger {
File folderRoot = temporaryFolder.getRoot();
File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));
Assert.assertTrue(folderA.mkdirs());
- Path folderAPath = new Path(folderA.toURI());
+ Path folderAPath = folderA.toPath();
SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(folderAPath);
@@ -160,7 +157,7 @@ public class SnapshotDirectoryTest extends TestLogger {
Assert.assertTrue(folderB.mkdirs());
File file = new File(folderA, "test.txt");
Assert.assertTrue(file.createNewFile());
- Path folderAPath = new Path(folderA.toURI());
+ Path folderAPath = folderA.toPath();
SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(folderAPath);
Assert.assertTrue(snapshotDirectory.cleanup());
Assert.assertFalse(folderA.isDirectory());
@@ -181,7 +178,7 @@ public class SnapshotDirectoryTest extends TestLogger {
File folderRoot = temporaryFolder.getRoot();
File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));
Assert.assertTrue(folderA.mkdirs());
- Path pathA = new Path(folderA.toURI());
+ Path pathA = folderA.toPath();
SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(pathA);
Assert.assertFalse(snapshotDirectory.isSnapshotCompleted());
Assert.assertNotNull(snapshotDirectory.completeSnapshotAndGetHandle());
@@ -207,29 +204,4 @@ public class SnapshotDirectoryTest extends TestLogger {
Assert.assertTrue(tmpSnapshotDirectory.cleanup());
Assert.assertFalse(folder.exists());
}
-
- /**
- * Tests that we always use the local file system even if we have specified a different default
- * file system. See FLINK-12042.
- */
- @Test
- public void testLocalFileSystemIsUsedForTemporary() throws Exception {
- // ensure that snapshot directory will always use the local file system instead of the default file system
- Configuration configuration = new Configuration();
- configuration.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "nonexistfs:///");
- FileSystem.initialize(configuration);
-
- final File folderRoot = temporaryFolder.getRoot();
-
- try {
- File folderB = new File(folderRoot, String.valueOf(UUID.randomUUID()));
- // only pass the path and leave the scheme missing
- SnapshotDirectory snapshotDirectoryB = SnapshotDirectory.temporary(folderB);
- Assert.assertEquals(snapshotDirectoryB.getFileSystem(), FileSystem.getLocalFileSystem());
- } finally {
- // restore the FileSystem configuration
- FileSystem.initialize(new Configuration());
- }
- }
-
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
index 8450076..1c82a8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.util.StringUtils;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index aa3201c..42a3f63 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -19,9 +19,6 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
@@ -31,6 +28,9 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;
import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -103,7 +103,7 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
StateHandleID stateHandleID = entry.getKey();
StreamStateHandle remoteFileHandle = entry.getValue();
- Path path = new Path(restoreInstancePath, stateHandleID.toString());
+ Path path = restoreInstancePath.resolve(stateHandleID.toString());
runnables.add(ThrowingRunnable.unchecked(
() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)));
@@ -120,14 +120,14 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
CloseableRegistry closeableRegistry) throws IOException {
FSDataInputStream inputStream = null;
- FSDataOutputStream outputStream = null;
+ OutputStream outputStream = null;
try {
- FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
inputStream = remoteFileHandle.openInputStream();
closeableRegistry.registerCloseable(inputStream);
- outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+ Files.createDirectories(restoreFilePath.getParent());
+ outputStream = Files.newOutputStream(restoreFilePath);
closeableRegistry.registerCloseable(outputStream);
byte[] buffer = new byte[8 * 1024];
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
index 045918d..2111c52 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
@@ -19,9 +19,6 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -35,6 +32,9 @@ import org.apache.flink.util.function.CheckedSupplier;
import javax.annotation.Nonnull;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -107,14 +107,14 @@ public class RocksDBStateUploader extends RocksDBStateDataTransfer {
Path filePath,
CheckpointStreamFactory checkpointStreamFactory,
CloseableRegistry closeableRegistry) throws IOException {
- FSDataInputStream inputStream = null;
+
+ InputStream inputStream = null;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
try {
final byte[] buffer = new byte[READ_BUFFER_SIZE];
- FileSystem backupFileSystem = filePath.getFileSystem();
- inputStream = backupFileSystem.open(filePath);
+ inputStream = Files.newInputStream(filePath);
closeableRegistry.registerCloseable(inputStream);
outputStream = checkpointStreamFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 35b6bab..f7abed6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -28,10 +28,6 @@ import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
@@ -48,6 +44,7 @@ import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.rocksdb.ColumnFamilyDescriptor;
@@ -64,7 +61,10 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
@@ -187,9 +187,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
}
private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) throws Exception {
- final Path tmpRestoreInstancePath = new Path(
- instanceBasePath.getAbsolutePath(),
- UUID.randomUUID().toString()); // used as restore source for IncrementalRemoteKeyedStateHandle
+ // used as restore source for IncrementalRemoteKeyedStateHandle
+ final Path tmpRestoreInstancePath = instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
try {
restoreFromLocalState(
transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
@@ -246,10 +245,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
private void cleanUpPathQuietly(@Nonnull Path path) {
try {
- FileSystem fileSystem = path.getFileSystem();
- if (fileSystem.exists(path)) {
- fileSystem.delete(path, true);
- }
+ FileUtils.deleteDirectory(path.toFile());
} catch (IOException ex) {
LOG.warn("Failed to clean up path " + path, ex);
}
@@ -296,7 +292,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
", but found " + rawStateHandle.getClass());
}
- Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
+ Path temporaryRestoreInstancePath = instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
(IncrementalRemoteKeyedStateHandle) rawStateHandle,
temporaryRestoreInstancePath);
@@ -427,7 +423,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
RocksDB restoreDb = RocksDBOperationUtils.openDB(
- temporaryRestoreInstancePath.getPath(),
+ temporaryRestoreInstancePath.toString(),
columnFamilyDescriptors,
columnFamilyHandles,
RocksDBOperationUtils.createColumnFamilyOptions(columnFamilyOptionsFactory, "default"),
@@ -461,26 +457,18 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
* a local state.
*/
private void restoreInstanceDirectoryFromPath(Path source, String instanceRocksDBPath) throws IOException {
+ final Path instanceRocksDBDirectory = Paths.get(instanceRocksDBPath);
+ final Path[] files = FileUtils.listDirectory(source);
- FileSystem fileSystem = source.getFileSystem();
-
- final FileStatus[] fileStatuses = fileSystem.listStatus(source);
-
- if (fileStatuses == null) {
- throw new IOException("Cannot list file statues. Directory " + source + " does not exist.");
- }
-
- for (FileStatus fileStatus : fileStatuses) {
- final Path filePath = fileStatus.getPath();
- final String fileName = filePath.getName();
- File restoreFile = new File(source.getPath(), fileName);
- File targetFile = new File(instanceRocksDBPath, fileName);
+ for (Path file : files) {
+ final String fileName = file.getFileName().toString();
+ final Path targetFile = instanceRocksDBDirectory.resolve(fileName);
if (fileName.endsWith(SST_FILE_SUFFIX)) {
// hardlink'ing the immutable sst-files.
- Files.createLink(targetFile.toPath(), restoreFile.toPath());
+ Files.createLink(targetFile, file);
} else {
// true copy for all other files.
- Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ Files.copy(file, targetFile, StandardCopyOption.REPLACE_EXISTING);
}
}
}
@@ -490,7 +478,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
*/
private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle metaStateHandle) throws Exception {
- FSDataInputStream inputStream = null;
+ InputStream inputStream = null;
try {
inputStream = metaStateHandle.openInputStream();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 16c72cc..23f4574 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -64,6 +62,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -200,7 +199,7 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
FileUtils.deleteDirectory(rdbSnapshotDir);
}
- Path path = new Path(rdbSnapshotDir.toURI());
+ Path path = rdbSnapshotDir.toPath();
// create a "permanent" snapshot directory because local recovery is active.
try {
return SnapshotDirectory.permanent(path);
@@ -246,7 +245,7 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
try (
ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
Checkpoint checkpoint = Checkpoint.create(db)) {
- checkpoint.createCheckpoint(outputDirectory.getDirectory().getPath());
+ checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
} catch (Exception ex) {
try {
outputDirectory.cleanup();
@@ -417,9 +416,9 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
- FileStatus[] fileStatuses = localBackupDirectory.listStatus();
- if (fileStatuses != null) {
- createUploadFilePaths(fileStatuses, sstFiles, sstFilePaths, miscFilePaths);
+ Path[] files = localBackupDirectory.listDirectory();
+ if (files != null) {
+ createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
sstFiles.putAll(stateUploader.uploadFilesToCheckpointFs(
sstFilePaths,
@@ -433,13 +432,12 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
}
private void createUploadFilePaths(
- FileStatus[] fileStatuses,
+ Path[] files,
Map<StateHandleID, StreamStateHandle> sstFiles,
Map<StateHandleID, Path> sstFilePaths,
Map<StateHandleID, Path> miscFilePaths) {
- for (FileStatus fileStatus : fileStatuses) {
- final Path filePath = fileStatus.getPath();
- final String fileName = filePath.getName();
+ for (Path filePath : files) {
+ final String fileName = filePath.getFileName().toString();
final StateHandleID stateHandleID = new StateHandleID(fileName);
if (fileName.endsWith(SST_FILE_SUFFIX)) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
index 76372b7..897b855 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateHandleID;
@@ -34,6 +33,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
@@ -91,7 +91,7 @@ public class RocksDBStateDownloaderTest extends TestLogger {
try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) {
rocksDBStateDownloader.transferAllStateDataToDirectory(
incrementalKeyedStateHandle,
- new Path(temporaryFolder.newFolder().toURI()),
+ temporaryFolder.newFolder().toPath(),
new CloseableRegistry());
fail();
} catch (Exception e) {
@@ -133,13 +133,13 @@ public class RocksDBStateDownloaderTest extends TestLogger {
privateStates,
handles.get(0));
- Path dstPath = new Path(temporaryFolder.newFolder().toURI());
+ Path dstPath = temporaryFolder.newFolder().toPath();
try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) {
rocksDBStateDownloader.transferAllStateDataToDirectory(incrementalKeyedStateHandle, dstPath, new CloseableRegistry());
}
for (int i = 0; i < contentNum; ++i) {
- assertStateContentEqual(contents[i], new Path(dstPath, String.format("sharedState%d", i)));
+ assertStateContentEqual(contents[i], dstPath.resolve(String.format("sharedState%d", i)));
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
index 8ac2f3c..1c99739 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StateHandleID;
@@ -41,7 +40,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -71,7 +70,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
generateRandomFileContent(file.getPath(), 20);
Map<StateHandleID, Path> filePaths = new HashMap<>(1);
- filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath()));
+ filePaths.put(new StateHandleID("mockHandleID"), file.toPath());
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
rocksDBStateUploader.uploadFilesToCheckpointFs(filePaths, checkpointStreamFactory, new CloseableRegistry());
fail();
@@ -86,10 +85,10 @@ public class RocksDBStateUploaderTest extends TestLogger {
@Test
public void testMultiThreadUploadCorrectly() throws Exception {
File checkpointPrivateFolder = temporaryFolder.newFolder("private");
- Path checkpointPrivateDirectory = new Path(checkpointPrivateFolder.getPath());
+ org.apache.flink.core.fs.Path checkpointPrivateDirectory = org.apache.flink.core.fs.Path.fromLocalFile(checkpointPrivateFolder);
File checkpointSharedFolder = temporaryFolder.newFolder("shared");
- Path checkpointSharedDirectory = new Path(checkpointSharedFolder.getPath());
+ org.apache.flink.core.fs.Path checkpointSharedDirectory = org.apache.flink.core.fs.Path.fromLocalFile(checkpointSharedFolder);
FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem();
int fileStateSizeThreshold = 1024;
@@ -157,7 +156,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
for (int i = 0; i < sstFileCount; ++i) {
File file = temporaryFolder.newFile(String.format("%s/%d.sst", localFolder, i));
generateRandomFileContent(file.getPath(), random.nextInt(1_000_000) + fileStateSizeThreshold);
- sstFilePaths.put(new StateHandleID(String.valueOf(i)), Path.fromLocalFile(file));
+ sstFilePaths.put(new StateHandleID(String.valueOf(i)), file.toPath());
}
return sstFilePaths;
}
@@ -171,7 +170,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
}
private void assertStateContentEqual(Path stateFilePath, FSDataInputStream inputStream) throws IOException {
- byte[] excepted = Files.readAllBytes(Paths.get(stateFilePath.toUri()));
+ byte[] excepted = Files.readAllBytes(stateFilePath);
byte[] actual = new byte[excepted.length];
IOUtils.readFully(inputStream, actual, 0, actual.length);
assertEquals(-1, inputStream.read());