You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/12 11:06:56 UTC

flink git commit: [FLINK-5300] Add more gentle file deletion procedure

Repository: flink
Updated Branches:
  refs/heads/release-1.1 afaa27e9f -> f3d0cc3c1


[FLINK-5300] Add more gentle file deletion procedure

Before deleting a parent directory always check the directory whether it contains some
files. If not, then try to delete the parent directory.

This will give a more gentle behaviour wrt storage systems which are not instructed to
delete a non-empty directory.

This closes #2971.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3d0cc3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3d0cc3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3d0cc3c

Branch: refs/heads/release-1.1
Commit: f3d0cc3c13b95fd59ba7603722b30b42037591b7
Parents: afaa27e
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 8 18:53:40 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Dec 12 12:06:13 2016 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FileOutputFormat.java   |  2 +-
 .../java/org/apache/flink/util/FileUtils.java   | 33 ++++++++++++++++++++
 .../filesystem/AbstractFileStateHandle.java     |  7 ++---
 .../state/filesystem/FsStateBackend.java        | 20 +++++++-----
 4 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 557c342..4a84512 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -315,7 +315,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen
 			} catch (FileNotFoundException e) {
 				// ignore, may not be visible yet or may be already removed
 			} catch (Throwable t) {
-				LOG.error("Could not remove the incomplete file " + actualFilePath);
+				LOG.error("Could not remove the incomplete file " + actualFilePath + '.', t);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 078599d..85f4b30 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.util;
 
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -81,6 +85,35 @@ public final class FileUtils {
 	public static void writeFileUtf8(File file, String contents) throws IOException {
 		writeFile(file, contents, "UTF-8");
 	}
+
+	// ------------------------------------------------------------------------
+	//  Deleting directories
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Deletes the path if it is empty (does not contain any other directories/files).
+	 *
+	 * @param fileSystem to use
+	 * @param path to be deleted if empty
+	 * @return true if the path could be deleted; otherwise false
+	 * @throws IOException if the delete operation fails
+	 */
+	public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException {
+		FileStatus[] fileStatuses = null;
+
+		try {
+			fileStatuses = fileSystem.listStatus(path);
+		} catch (Exception ignored) {}
+
+		// if there are no more files or if we couldn't list the file status try to delete the path
+		if (fileStatuses == null || fileStatuses.length == 0) {
+			// attempt to delete the path (will fail and be ignored if the path now contains
+			// some files (possibly added concurrently))
+			return fileSystem.delete(path, false);
+		} else {
+			return false;
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
index e9d54dc..805c12d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractCloseableHandle;
 import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,11 +73,9 @@ public abstract class AbstractFileStateHandle extends AbstractCloseableHandle im
 	public void discardState() throws Exception {
 		getFileSystem().delete(filePath, false);
 
-		// send a call to delete the checkpoint directory containing the file. This will
-		// fail (and be ignored) when some files still exist
 		try {
-			getFileSystem().delete(filePath.getParent(), false);
-		} catch (IOException ignored) {}
+			FileUtils.deletePathIfEmpty(getFileSystem(), filePath.getParent());
+		} catch (Exception ignored) {}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index e783264..c32fa26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.apache.flink.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -630,10 +631,11 @@ public class FsStateBackend extends AbstractStateBackend {
 						try {
 							fs.delete(statePath, false);
 
-							// attempt to delete the parent (will fail and be ignored if the parent has more files)
 							try {
-								fs.delete(basePath, false);
-							} catch (Throwable ignored) {}
+								FileUtils.deletePathIfEmpty(fs, basePath);
+							} catch (Throwable ignored) {
+								LOG.debug("Could not delete parent directory for path {}.", basePath, ignored);
+							}
 						} catch (Throwable ioE) {
 							LOG.warn("Could not delete stream file for {}.", statePath, ioE);
 						}
@@ -669,8 +671,10 @@ public class FsStateBackend extends AbstractStateBackend {
 								fs.delete(statePath, false);
 
 								try {
-									fs.delete(basePath, false);
-								} catch (Throwable ignored) {}
+									FileUtils.deletePathIfEmpty(fs, basePath);
+								} catch (Throwable ignored) {
+									LOG.debug("Could not delete parent directory for path {}.", basePath, ignored);
+								}
 							} catch (Throwable deleteException) {
 								LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException);
 							}
@@ -715,8 +719,10 @@ public class FsStateBackend extends AbstractStateBackend {
 							fs.delete(statePath, false);
 
 							try {
-								fs.delete(basePath, false);
-							} catch (Throwable ignored) {}
+								FileUtils.deletePathIfEmpty(fs, basePath);
+							} catch (Throwable ignored) {
+								LOG.debug("Could not delete parent directory for path {}.", basePath, ignored);
+							}
 						} catch (Throwable deleteException) {
 							LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException);
 						}