You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Caizhi Weng (Jira)" <ji...@apache.org> on 2021/12/27 07:13:00 UTC
[jira] [Created] (FLINK-25453) LocalFileSystem#listStatus throws FileNotFoundException when a file is deleted concurrently in the directory
Caizhi Weng created FLINK-25453:
-----------------------------------
Summary: LocalFileSystem#listStatus throws FileNotFoundException when a file is deleted concurrently in the directory
Key: FLINK-25453
URL: https://issues.apache.org/jira/browse/FLINK-25453
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 1.14.2
Reporter: Caizhi Weng
Add the following test case to {{LocalFileSystemTest}} to reproduce this issue.
{code:java}
@Test
public void myTest() throws Exception {
temporaryFolder.create();
Runnable r =
() -> {
try {
Path path = new Path(temporaryFolder.getRoot() + "/" + UUID.randomUUID());
FileSystem fs = path.getFileSystem();
try (FSDataOutputStream out =
fs.create(path, FileSystem.WriteMode.NO_OVERWRITE)) {
OutputStreamWriter writer =
new OutputStreamWriter(out, StandardCharsets.UTF_8);
writer.write("test");
writer.flush();
}
Thread.sleep(ThreadLocalRandom.current().nextInt(100));
fs.listStatus(new Path(temporaryFolder.getRoot().toString()));
Thread.sleep(ThreadLocalRandom.current().nextInt(100));
fs.delete(path, false);
} catch (Exception e) {
e.printStackTrace();
}
};
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(r);
thread.start();
threads.add(thread);
}
for (Thread thread : threads) {
thread.join();
}
}
{code}
Exception stack
{code}
java.io.FileNotFoundException: File /var/folders/y9/hqm_j18s105g5n8_xq00rd7c0000gp/T/junit8680341925762938456/f3b7f8a3-7092-464a-af7e-e7f8465c041d does not exist or the user running Flink ('tsreaper') has insufficient permissions to access it.
at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
at org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:167)
at org.apache.flink.core.fs.local.LocalFileSystemTest.lambda$myTest$0(LocalFileSystemTest.java:90)
at java.lang.Thread.run(Thread.java:748)
{code}
This is because {{listStatus}} is not atomic. {{LocalFileSystem}} will first get all file names in the directory and query for the status of each file. If that file is removed after the start of {{listStatus}} but before the query {{FileNotFoundException}} will be thrown.
Hadoop's {{RawLocalFileSystem}} handles this by ignoring {{FileNotFoundException}}.
{code:java}
for (int i = 0; i < names.length; i++) {
try {
// Assemble the path using the Path 3 arg constructor to make sure
// paths with colon are properly resolved on Linux
results[j] = getFileStatus(new Path(f, new Path(null, null, names[i])));
j++;
} catch (FileNotFoundException e) {
// ignore the files not found since the dir list may have have changed
// since the names[] list was generated.
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)