You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/12 11:06:56 UTC
flink git commit: [FLINK-5300] Add more gentle file deletion procedure
Repository: flink
Updated Branches:
refs/heads/release-1.1 afaa27e9f -> f3d0cc3c1
[FLINK-5300] Add more gentle file deletion procedure
Before deleting a parent directory always check the directory whether it contains some
files. If not, then try to delete the parent directory.
This will give a more gentle behaviour wrt storage systems which are not instructed to
delete a non-empty directory.
This closes #2971.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3d0cc3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3d0cc3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3d0cc3c
Branch: refs/heads/release-1.1
Commit: f3d0cc3c13b95fd59ba7603722b30b42037591b7
Parents: afaa27e
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 8 18:53:40 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Dec 12 12:06:13 2016 +0100
----------------------------------------------------------------------
.../flink/api/common/io/FileOutputFormat.java | 2 +-
.../java/org/apache/flink/util/FileUtils.java | 33 ++++++++++++++++++++
.../filesystem/AbstractFileStateHandle.java | 7 ++---
.../state/filesystem/FsStateBackend.java | 20 +++++++-----
4 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 557c342..4a84512 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -315,7 +315,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen
} catch (FileNotFoundException e) {
// ignore, may not be visible yet or may be already removed
} catch (Throwable t) {
- LOG.error("Could not remove the incomplete file " + actualFilePath);
+ LOG.error("Could not remove the incomplete file " + actualFilePath + '.', t);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 078599d..85f4b30 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -18,6 +18,10 @@
package org.apache.flink.util;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -81,6 +85,35 @@ public final class FileUtils {
public static void writeFileUtf8(File file, String contents) throws IOException {
writeFile(file, contents, "UTF-8");
}
+
+ // ------------------------------------------------------------------------
+ // Deleting directories
+ // ------------------------------------------------------------------------
+
+ /**
+ * Deletes the path if it is empty (does not contain any other directories/files).
+ *
+ * @param fileSystem to use
+ * @param path to be deleted if empty
+ * @return true if the path could be deleted; otherwise false
+ * @throws IOException if the delete operation fails
+ */
+ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException {
+ FileStatus[] fileStatuses = null;
+
+ try {
+ fileStatuses = fileSystem.listStatus(path);
+ } catch (Exception ignored) {}
+
+ // if there are no more files or if we couldn't list the file status try to delete the path
+ if (fileStatuses == null || fileStatuses.length == 0) {
+ // attempt to delete the path (will fail and be ignored if the path now contains
+ // some files (possibly added concurrently))
+ return fileSystem.delete(path, false);
+ } else {
+ return false;
+ }
+ }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
index e9d54dc..805c12d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractCloseableHandle;
import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,11 +73,9 @@ public abstract class AbstractFileStateHandle extends AbstractCloseableHandle im
public void discardState() throws Exception {
getFileSystem().delete(filePath, false);
- // send a call to delete the checkpoint directory containing the file. This will
- // fail (and be ignored) when some files still exist
try {
- getFileSystem().delete(filePath.getParent(), false);
- } catch (IOException ignored) {}
+ FileUtils.deletePathIfEmpty(getFileSystem(), filePath.getParent());
+ } catch (Exception ignored) {}
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index e783264..c32fa26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -630,10 +631,11 @@ public class FsStateBackend extends AbstractStateBackend {
try {
fs.delete(statePath, false);
- // attempt to delete the parent (will fail and be ignored if the parent has more files)
try {
- fs.delete(basePath, false);
- } catch (Throwable ignored) {}
+ FileUtils.deletePathIfEmpty(fs, basePath);
+ } catch (Throwable ignored) {
+ LOG.debug("Could not delete parent directory for path {}.", basePath, ignored);
+ }
} catch (Throwable ioE) {
LOG.warn("Could not delete stream file for {}.", statePath, ioE);
}
@@ -669,8 +671,10 @@ public class FsStateBackend extends AbstractStateBackend {
fs.delete(statePath, false);
try {
- fs.delete(basePath, false);
- } catch (Throwable ignored) {}
+ FileUtils.deletePathIfEmpty(fs, basePath);
+ } catch (Throwable ignored) {
+ LOG.debug("Could not delete parent directory for path {}.", basePath, ignored);
+ }
} catch (Throwable deleteException) {
LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException);
}
@@ -715,8 +719,10 @@ public class FsStateBackend extends AbstractStateBackend {
fs.delete(statePath, false);
try {
- fs.delete(basePath, false);
- } catch (Throwable ignored) {}
+ FileUtils.deletePathIfEmpty(fs, basePath);
+ } catch (Throwable ignored) {
+ LOG.debug("Could not delete parent directory for path {}.", basePath, ignored);
+ }
} catch (Throwable deleteException) {
LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException);
}