You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/03 17:04:10 UTC

[2/5] git commit: [FLINK-1202] Remove incomplete file outputs on failure

[FLINK-1202] Remove incomplete file outputs on failure

This closes #175


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a747b614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a747b614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a747b614

Branch: refs/heads/master
Commit: a747b6146ed5d5766b42e6bed3c2e7a811e8d00e
Parents: f42dcc3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 3 11:47:51 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:04:46 2014 +0100

----------------------------------------------------------------------
 .../api/common/io/CleanupWhenUnsuccessful.java  | 32 +++++++++++++++++
 .../flink/api/common/io/FileOutputFormat.java   | 37 +++++++++++++++-----
 .../flink/runtime/operators/DataSinkTask.java   | 26 ++++++++++++++
 .../runtime/operators/DataSinkTaskTest.java     |  7 ++--
 4 files changed, 90 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java b/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
new file mode 100644
index 0000000..4b912e1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+/**
+ * {@link OutputFormat}s may implement this interface to run a cleanup hook when the execution is not successful.
+ */
+public interface CleanupWhenUnsuccessful {
+	
+	/**
+	 * Hook that is called upon an unsuccessful execution.
+	 * 
+	 * @throws Exception The method may forward exceptions when the cleanup fails.
+	 */
+	void tryCleanupOnError() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index a9beddb..bc7ab73 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.io;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.slf4j.Logger;
@@ -35,7 +36,7 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
  * The abstract base class for all output formats that are file based. Contains the logic to open/close the target
  * file streams.
  */
-public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster {
+public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster, CleanupWhenUnsuccessful {
 	
 	private static final long serialVersionUID = 1L;
 
@@ -106,10 +107,14 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
 	
 	// --------------------------------------------------------------------------------------------
 	
-	/**
-	 * The stream to which the data is written;
-	 */
+	/** The stream to which the data is written; */
 	protected transient FSDataOutputStream stream;
+	
+	/** The path that is actually written to (may a a file in a the directory defined by {@code outputFilePath} ) */
+	private transient Path actualFilePath;
+	
+	/** Flag indicating whether this format actually created a file, which should be removed on cleanup. */
+	private transient boolean fileCreated;
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -231,12 +236,13 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
 			
 			
 		// Suffix the path with the parallel instance index, if needed
-		if (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) {
-			p = p.suffix("/" + (taskNumber+1));
-		}
+		this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + (taskNumber+1)) : p;
 
 		// create output file
-		this.stream = fs.create(p, writeMode == WriteMode.OVERWRITE);
+		this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE);
+		
+		// at this point, the file creation must have succeeded, or an exception has been thrown
+		this.fileCreated = true;
 	}
 
 	@Override
@@ -283,6 +289,21 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
 		}
 	}
 	
+	@Override
+	public void tryCleanupOnError() {
+		if (this.fileCreated) {
+			this.fileCreated = false;
+			
+			try {
+				FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
+			} catch (FileNotFoundException e) {
+				// ignore, may not be visible yet or may be already removed
+			} catch (Throwable t) {
+				LOG.error("Could not remove the incomplete file " + actualFilePath);
+			}
+		}
+	}
+	
 	// ============================================================================================
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 1a378b2..b1185c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -78,6 +79,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 	// cancel flag
 	private volatile boolean taskCanceled;
 	
+	private volatile boolean cleanupCalled;
+	
 
 	@Override
 	public void registerInputOutput() {
@@ -180,6 +183,18 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 			}
 		}
 		catch (Exception ex) {
+			
+			// make a best effort to clean up
+			try {
+				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+					cleanupCalled = true;
+					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+				}
+			}
+			catch (Throwable t) {
+				LOG.error("Cleanup on error failed.", t);
+			}
+			
 			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
 
 			if (ex instanceof CancelTaskException) {
@@ -237,6 +252,17 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 			try {
 				this.format.close();
 			} catch (Throwable t) {}
+			
+			// make a best effort to clean up
+			try {
+				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+					cleanupCalled = true;
+					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+				}
+			}
+			catch (Throwable t) {
+				LOG.error("Cleanup on error failed.", t);
+			}
 		}
 		
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index f5381e4..3219a21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -306,7 +306,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
-		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
+		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
 		
 	}
 	
@@ -347,7 +347,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
-		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
+		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
 		
 	}
 	
@@ -388,8 +388,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		
 		// assert that temp file was created
 		File tempTestFile = new File(this.tempTestPath);
-		Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
-				
+		Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
 	}
 	
 	@Test