You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/02/25 03:38:55 UTC

[flink] branch release-1.10 updated: [FLINK-10918][state backends] Change o.a.f.core.fs.Path to java.nio.file.Path in RocksDB incremental checkpoints

This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 3993292  [FLINK-10918][state backends] Change o.a.f.core.fs.Path to java.nio.file.Path in RocksDB incremental checkpoints
3993292 is described below

commit 399329275e5e2baca9ed9494cce97ff732ac077a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 13 20:50:07 2020 +0100

    [FLINK-10918][state backends] Change o.a.f.core.fs.Path to java.nio.file.Path in RocksDB incremental checkpoints
    
    This solves the bug that Paths are handled wrong on Windows, breaking incremental checkpoints.
    
    This closs #11095
---
 .../main/java/org/apache/flink/util/FileUtils.java | 12 ++++++
 .../flink/runtime/state/DirectoryStateHandle.java  | 31 +++++++++-----
 .../flink/runtime/state/SnapshotDirectory.java     | 42 ++++++++-----------
 .../flink/runtime/state/SnapshotDirectoryTest.java | 48 +++++-----------------
 .../state/StateSnapshotTransformerTest.java        |  2 +-
 .../streaming/state/RocksDBStateDownloader.java    | 14 +++----
 .../streaming/state/RocksDBStateUploader.java      | 12 +++---
 .../RocksDBIncrementalRestoreOperation.java        | 46 ++++++++-------------
 .../snapshot/RocksIncrementalSnapshotStrategy.java | 20 ++++-----
 .../state/RocksDBStateDownloaderTest.java          |  8 ++--
 .../streaming/state/RocksDBStateUploaderTest.java  | 13 +++---
 11 files changed, 109 insertions(+), 139 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 485eb02..be3e68d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -51,6 +51,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 import java.util.zip.ZipOutputStream;
@@ -100,6 +101,17 @@ public final class FileUtils {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Lists the given directory in a resource-leak-safe way.
+	 */
+	public static java.nio.file.Path[] listDirectory(java.nio.file.Path directory) throws IOException {
+		try (Stream<java.nio.file.Path> stream = Files.list(directory)) {
+			return stream.toArray(java.nio.file.Path[]::new);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
 	 * Constructs a random filename with the given prefix and
 	 * a random part generated from hex characters.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
index 2d08777..f57dc9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 
 /**
  * This state handle represents a directory. This class is, for example, used to represent the directory of RocksDB's
@@ -34,18 +35,21 @@ public class DirectoryStateHandle implements StateObject {
 	/** Serial version. */
 	private static final long serialVersionUID = 1L;
 
-	/** The path that describes the directory. */
-	@Nonnull
-	private final Path directory;
+	/** The path that describes the directory, as a string, to be serializable. */
+	private final String directoryString;
+
+	/** Transient path cache, to avoid re-parsing the string. */
+	private transient Path directory;
 
 	public DirectoryStateHandle(@Nonnull Path directory) {
 		this.directory = directory;
+		this.directoryString = directory.toString();
 	}
 
 	@Override
 	public void discardState() throws IOException {
-		FileSystem fileSystem = directory.getFileSystem();
-		fileSystem.delete(directory, true);
+		ensurePath();
+		FileUtils.deleteDirectory(directory.toFile());
 	}
 
 	@Override
@@ -56,9 +60,16 @@ public class DirectoryStateHandle implements StateObject {
 
 	@Nonnull
 	public Path getDirectory() {
+		ensurePath();
 		return directory;
 	}
 
+	private void ensurePath() {
+		if (directory == null) {
+			directory = Paths.get(directoryString);
+		}
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -70,18 +81,18 @@ public class DirectoryStateHandle implements StateObject {
 
 		DirectoryStateHandle that = (DirectoryStateHandle) o;
 
-		return directory.equals(that.directory);
+		return directoryString.equals(that.directoryString);
 	}
 
 	@Override
 	public int hashCode() {
-		return directory.hashCode();
+		return directoryString.hashCode();
 	}
 
 	@Override
 	public String toString() {
 		return "DirectoryStateHandle{" +
-			"directory=" + directory +
+			"directory=" + directoryString +
 			'}';
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
index 19e4b52..5809daf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -49,50 +49,37 @@ public abstract class SnapshotDirectory {
 	@Nonnull
 	protected final Path directory;
 
-	/** The filesystem that contains the snapshot directory. */
-	@Nonnull
-	protected final FileSystem fileSystem;
-
 	/** This reference tracks the lifecycle state of the snapshot directory. */
 	@Nonnull
 	protected AtomicReference<State> state;
 
-	private SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem fileSystem) {
+	private SnapshotDirectory(@Nonnull Path directory) {
 		this.directory = directory;
-		this.fileSystem = fileSystem;
 		this.state = new AtomicReference<>(State.ONGOING);
 	}
 
-	private SnapshotDirectory(@Nonnull Path directory) throws IOException {
-		this(directory, directory.getFileSystem());
-	}
-
 	@Nonnull
 	public Path getDirectory() {
 		return directory;
 	}
 
 	public boolean mkdirs() throws IOException {
-		return fileSystem.mkdirs(directory);
-	}
-
-	@Nonnull
-	public FileSystem getFileSystem() {
-		return fileSystem;
+		Files.createDirectories(directory);
+		return true;
 	}
 
 	public boolean exists() throws IOException {
-		return fileSystem.exists(directory);
+		return Files.exists(directory);
 	}
 
 	/**
-	 * List the statuses of the files/directories in the snapshot directory.
+	 * List the files in the snapshot directory.
 	 *
-	 * @return the statuses of the files/directories in the given path.
+	 * @return the files in the snapshot directory.
 	 * @throws IOException if there is a problem creating the file statuses.
 	 */
-	public FileStatus[] listStatus() throws IOException {
-		return fileSystem.listStatus(directory);
+	public Path[] listDirectory() throws IOException {
+		return FileUtils.listDirectory(directory);
 	}
 
 	/**
@@ -103,7 +90,10 @@ public abstract class SnapshotDirectory {
 	 * @throws IOException if an exception happens during the delete.
 	 */
 	public boolean cleanup() throws IOException {
-		return !state.compareAndSet(State.ONGOING, State.DELETED) || fileSystem.delete(directory, true);
+		if (state.compareAndSet(State.ONGOING, State.DELETED)) {
+			FileUtils.deleteDirectory(directory.toFile());
+		}
+		return true;
 	}
 
 	/**
@@ -174,7 +164,7 @@ public abstract class SnapshotDirectory {
 	private static class TemporarySnapshotDirectory extends SnapshotDirectory {
 
 		TemporarySnapshotDirectory(@Nonnull File directory) throws IOException {
-			super(new Path(directory.toURI()), FileSystem.getLocalFileSystem());
+			super(directory.toPath());
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
index 392e844..67ba641 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -32,6 +29,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.UUID;
 
@@ -61,7 +59,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File folderRoot = temporaryFolder.getRoot();
 		File newFolder = new File(folderRoot, String.valueOf(UUID.randomUUID()));
 		File innerNewFolder = new File(newFolder, String.valueOf(UUID.randomUUID()));
-		Path path = new Path(innerNewFolder.toURI());
+		Path path = innerNewFolder.toPath();
 
 		Assert.assertFalse(newFolder.isDirectory());
 		Assert.assertFalse(innerNewFolder.isDirectory());
@@ -85,7 +83,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));
 
 		Assert.assertFalse(folderA.isDirectory());
-		Path path = new Path(folderA.toURI());
+		Path path = folderA.toPath();
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path);
 		Assert.assertFalse(snapshotDirectory.exists());
 		Assert.assertTrue(folderA.mkdirs());
@@ -106,14 +104,13 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File file = new File(folderA, "test.txt");
 		Assert.assertTrue(file.createNewFile());
 
-		Path path = new Path(folderA.toURI());
-		FileSystem fileSystem = path.getFileSystem();
+		Path path = folderA.toPath();
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path);
 		Assert.assertTrue(snapshotDirectory.exists());
 
 		Assert.assertEquals(
-			Arrays.toString(fileSystem.listStatus(path)),
-			Arrays.toString(snapshotDirectory.listStatus()));
+			Arrays.toString(FileUtils.listDirectory(path)),
+			Arrays.toString(snapshotDirectory.listDirectory()));
 	}
 
 	/**
@@ -125,7 +122,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File folderRoot = temporaryFolder.getRoot();
 		File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));
 		Assert.assertTrue(folderA.mkdirs());
-		Path folderAPath = new Path(folderA.toURI());
+		Path folderAPath = folderA.toPath();
 
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(folderAPath);
 
@@ -160,7 +157,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		Assert.assertTrue(folderB.mkdirs());
 		File file = new File(folderA, "test.txt");
 		Assert.assertTrue(file.createNewFile());
-		Path folderAPath = new Path(folderA.toURI());
+		Path folderAPath = folderA.toPath();
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(folderAPath);
 		Assert.assertTrue(snapshotDirectory.cleanup());
 		Assert.assertFalse(folderA.isDirectory());
@@ -181,7 +178,7 @@ public class SnapshotDirectoryTest extends TestLogger {
 		File folderRoot = temporaryFolder.getRoot();
 		File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID()));
 		Assert.assertTrue(folderA.mkdirs());
-		Path pathA = new Path(folderA.toURI());
+		Path pathA = folderA.toPath();
 		SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(pathA);
 		Assert.assertFalse(snapshotDirectory.isSnapshotCompleted());
 		Assert.assertNotNull(snapshotDirectory.completeSnapshotAndGetHandle());
@@ -207,29 +204,4 @@ public class SnapshotDirectoryTest extends TestLogger {
 		Assert.assertTrue(tmpSnapshotDirectory.cleanup());
 		Assert.assertFalse(folder.exists());
 	}
-
-	/**
-	 * Tests that we always use the local file system even if we have specified a different default
-	 * file system. See FLINK-12042.
-	 */
-	@Test
-	public void testLocalFileSystemIsUsedForTemporary() throws Exception {
-		// ensure that snapshot directory will always use the local file system instead of the default file system
-		Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "nonexistfs:///");
-		FileSystem.initialize(configuration);
-
-		final File folderRoot = temporaryFolder.getRoot();
-
-		try {
-			File folderB = new File(folderRoot, String.valueOf(UUID.randomUUID()));
-			// only pass the path and leave the scheme missing
-			SnapshotDirectory snapshotDirectoryB = SnapshotDirectory.temporary(folderB);
-			Assert.assertEquals(snapshotDirectoryB.getFileSystem(), FileSystem.getLocalFileSystem());
-		} finally {
-			// restore the FileSystem configuration
-			FileSystem.initialize(new Configuration());
-		}
-	}
-
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
index 8450076..1c82a8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.StringUtils;
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index aa3201c..42a3f63 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -19,9 +19,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -31,6 +28,9 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -103,7 +103,7 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
 			StateHandleID stateHandleID = entry.getKey();
 			StreamStateHandle remoteFileHandle = entry.getValue();
 
-			Path path = new Path(restoreInstancePath, stateHandleID.toString());
+			Path path = restoreInstancePath.resolve(stateHandleID.toString());
 
 			runnables.add(ThrowingRunnable.unchecked(
 				() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)));
@@ -120,14 +120,14 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
 		CloseableRegistry closeableRegistry) throws IOException {
 
 		FSDataInputStream inputStream = null;
-		FSDataOutputStream outputStream = null;
+		OutputStream outputStream = null;
 
 		try {
-			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
 			inputStream = remoteFileHandle.openInputStream();
 			closeableRegistry.registerCloseable(inputStream);
 
-			outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+			Files.createDirectories(restoreFilePath.getParent());
+			outputStream = Files.newOutputStream(restoreFilePath);
 			closeableRegistry.registerCloseable(outputStream);
 
 			byte[] buffer = new byte[8 * 1024];
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
index 045918d..2111c52 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
@@ -19,9 +19,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -35,6 +32,9 @@ import org.apache.flink.util.function.CheckedSupplier;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -107,14 +107,14 @@ public class RocksDBStateUploader extends RocksDBStateDataTransfer {
 		Path filePath,
 		CheckpointStreamFactory checkpointStreamFactory,
 		CloseableRegistry closeableRegistry) throws IOException {
-		FSDataInputStream inputStream = null;
+
+		InputStream inputStream = null;
 		CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
 
 		try {
 			final byte[] buffer = new byte[READ_BUFFER_SIZE];
 
-			FileSystem backupFileSystem = filePath.getFileSystem();
-			inputStream = backupFileSystem.open(filePath);
+			inputStream = Files.newInputStream(filePath);
 			closeableRegistry.registerCloseable(inputStream);
 
 			outputStream = checkpointStreamFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 35b6bab..f7abed6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -28,10 +28,6 @@ import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
@@ -48,6 +44,7 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -64,7 +61,10 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -187,9 +187,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 	}
 
 	private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) throws Exception {
-		final Path tmpRestoreInstancePath = new Path(
-			instanceBasePath.getAbsolutePath(),
-			UUID.randomUUID().toString()); // used as restore source for IncrementalRemoteKeyedStateHandle
+		// used as restore source for IncrementalRemoteKeyedStateHandle
+		final Path tmpRestoreInstancePath = instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
 		try {
 			restoreFromLocalState(
 				transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
@@ -246,10 +245,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 
 	private void cleanUpPathQuietly(@Nonnull Path path) {
 		try {
-			FileSystem fileSystem = path.getFileSystem();
-			if (fileSystem.exists(path)) {
-				fileSystem.delete(path, true);
-			}
+			FileUtils.deleteDirectory(path.toFile());
 		} catch (IOException ex) {
 			LOG.warn("Failed to clean up path " + path, ex);
 		}
@@ -296,7 +292,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 					", but found " + rawStateHandle.getClass());
 			}
 
-			Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
+			Path temporaryRestoreInstancePath = instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
 			try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
 				(IncrementalRemoteKeyedStateHandle) rawStateHandle,
 				temporaryRestoreInstancePath);
@@ -427,7 +423,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
 
 		RocksDB restoreDb = RocksDBOperationUtils.openDB(
-			temporaryRestoreInstancePath.getPath(),
+			temporaryRestoreInstancePath.toString(),
 			columnFamilyDescriptors,
 			columnFamilyHandles,
 			RocksDBOperationUtils.createColumnFamilyOptions(columnFamilyOptionsFactory, "default"),
@@ -461,26 +457,18 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 	 * a local state.
 	 */
 	private void restoreInstanceDirectoryFromPath(Path source, String instanceRocksDBPath) throws IOException {
+		final Path instanceRocksDBDirectory = Paths.get(instanceRocksDBPath);
+		final Path[] files = FileUtils.listDirectory(source);
 
-		FileSystem fileSystem = source.getFileSystem();
-
-		final FileStatus[] fileStatuses = fileSystem.listStatus(source);
-
-		if (fileStatuses == null) {
-			throw new IOException("Cannot list file statues. Directory " + source + " does not exist.");
-		}
-
-		for (FileStatus fileStatus : fileStatuses) {
-			final Path filePath = fileStatus.getPath();
-			final String fileName = filePath.getName();
-			File restoreFile = new File(source.getPath(), fileName);
-			File targetFile = new File(instanceRocksDBPath, fileName);
+		for (Path file : files) {
+			final String fileName = file.getFileName().toString();
+			final Path targetFile = instanceRocksDBDirectory.resolve(fileName);
 			if (fileName.endsWith(SST_FILE_SUFFIX)) {
 				// hardlink'ing the immutable sst-files.
-				Files.createLink(targetFile.toPath(), restoreFile.toPath());
+				Files.createLink(targetFile, file);
 			} else {
 				// true copy for all other files.
-				Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+				Files.copy(file, targetFile, StandardCopyOption.REPLACE_EXISTING);
 			}
 		}
 	}
@@ -490,7 +478,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 	 */
 	private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle metaStateHandle) throws Exception {
 
-		FSDataInputStream inputStream = null;
+		InputStream inputStream = null;
 
 		try {
 			inputStream = metaStateHandle.openInputStream();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 16c72cc..23f4574 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
 import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -64,6 +62,7 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -200,7 +199,7 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
 				FileUtils.deleteDirectory(rdbSnapshotDir);
 			}
 
-			Path path = new Path(rdbSnapshotDir.toURI());
+			Path path = rdbSnapshotDir.toPath();
 			// create a "permanent" snapshot directory because local recovery is active.
 			try {
 				return SnapshotDirectory.permanent(path);
@@ -246,7 +245,7 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
 		try (
 			ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
 			Checkpoint checkpoint = Checkpoint.create(db)) {
-			checkpoint.createCheckpoint(outputDirectory.getDirectory().getPath());
+			checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
 		} catch (Exception ex) {
 			try {
 				outputDirectory.cleanup();
@@ -417,9 +416,9 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
 			Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
 			Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
 
-			FileStatus[] fileStatuses = localBackupDirectory.listStatus();
-			if (fileStatuses != null) {
-				createUploadFilePaths(fileStatuses, sstFiles, sstFilePaths, miscFilePaths);
+			Path[] files = localBackupDirectory.listDirectory();
+			if (files != null) {
+				createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
 
 				sstFiles.putAll(stateUploader.uploadFilesToCheckpointFs(
 					sstFilePaths,
@@ -433,13 +432,12 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy
 		}
 
 		private void createUploadFilePaths(
-			FileStatus[] fileStatuses,
+			Path[] files,
 			Map<StateHandleID, StreamStateHandle> sstFiles,
 			Map<StateHandleID, Path> sstFilePaths,
 			Map<StateHandleID, Path> miscFilePaths) {
-			for (FileStatus fileStatus : fileStatuses) {
-				final Path filePath = fileStatus.getPath();
-				final String fileName = filePath.getName();
+			for (Path filePath : files) {
+				final String fileName = filePath.getFileName().toString();
 				final StateHandleID stateHandleID = new StateHandleID(fileName);
 
 				if (fileName.endsWith(SST_FILE_SUFFIX)) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
index 76372b7..897b855 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -34,6 +33,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -91,7 +91,7 @@ public class RocksDBStateDownloaderTest extends TestLogger {
 		try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) {
 			rocksDBStateDownloader.transferAllStateDataToDirectory(
 				incrementalKeyedStateHandle,
-				new Path(temporaryFolder.newFolder().toURI()),
+				temporaryFolder.newFolder().toPath(),
 				new CloseableRegistry());
 			fail();
 		} catch (Exception e) {
@@ -133,13 +133,13 @@ public class RocksDBStateDownloaderTest extends TestLogger {
 				privateStates,
 				handles.get(0));
 
-		Path dstPath = new Path(temporaryFolder.newFolder().toURI());
+		Path dstPath = temporaryFolder.newFolder().toPath();
 		try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(5)) {
 			rocksDBStateDownloader.transferAllStateDataToDirectory(incrementalKeyedStateHandle, dstPath, new CloseableRegistry());
 		}
 
 		for (int i = 0; i < contentNum; ++i) {
-			assertStateContentEqual(contents[i], new Path(dstPath, String.format("sharedState%d", i)));
+			assertStateContentEqual(contents[i], dstPath.resolve(String.format("sharedState%d", i)));
 		}
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
index 8ac2f3c..1c99739 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -41,7 +40,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -71,7 +70,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
 		generateRandomFileContent(file.getPath(), 20);
 
 		Map<StateHandleID, Path> filePaths = new HashMap<>(1);
-		filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath()));
+		filePaths.put(new StateHandleID("mockHandleID"), file.toPath());
 		try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
 			rocksDBStateUploader.uploadFilesToCheckpointFs(filePaths, checkpointStreamFactory, new CloseableRegistry());
 			fail();
@@ -86,10 +85,10 @@ public class RocksDBStateUploaderTest extends TestLogger {
 	@Test
 	public void testMultiThreadUploadCorrectly() throws Exception {
 		File checkpointPrivateFolder = temporaryFolder.newFolder("private");
-		Path checkpointPrivateDirectory = new Path(checkpointPrivateFolder.getPath());
+		org.apache.flink.core.fs.Path checkpointPrivateDirectory = org.apache.flink.core.fs.Path.fromLocalFile(checkpointPrivateFolder);
 
 		File checkpointSharedFolder = temporaryFolder.newFolder("shared");
-		Path checkpointSharedDirectory = new Path(checkpointSharedFolder.getPath());
+		org.apache.flink.core.fs.Path checkpointSharedDirectory = org.apache.flink.core.fs.Path.fromLocalFile(checkpointSharedFolder);
 
 		FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem();
 		int fileStateSizeThreshold = 1024;
@@ -157,7 +156,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
 		for (int i = 0; i < sstFileCount; ++i) {
 			File file = temporaryFolder.newFile(String.format("%s/%d.sst", localFolder, i));
 			generateRandomFileContent(file.getPath(), random.nextInt(1_000_000) + fileStateSizeThreshold);
-			sstFilePaths.put(new StateHandleID(String.valueOf(i)), Path.fromLocalFile(file));
+			sstFilePaths.put(new StateHandleID(String.valueOf(i)), file.toPath());
 		}
 		return sstFilePaths;
 	}
@@ -171,7 +170,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
 	}
 
 	private void assertStateContentEqual(Path stateFilePath, FSDataInputStream inputStream) throws IOException {
-		byte[] excepted = Files.readAllBytes(Paths.get(stateFilePath.toUri()));
+		byte[] excepted = Files.readAllBytes(stateFilePath);
 		byte[] actual = new byte[excepted.length];
 		IOUtils.readFully(inputStream, actual, 0, actual.length);
 		assertEquals(-1, inputStream.read());