You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/03 17:04:10 UTC
[2/5] git commit: [FLINK-1202] Remove incomplete file outputs on
failure
[FLINK-1202] Remove incomplete file outputs on failure
This closes #175
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a747b614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a747b614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a747b614
Branch: refs/heads/master
Commit: a747b6146ed5d5766b42e6bed3c2e7a811e8d00e
Parents: f42dcc3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 3 11:47:51 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:04:46 2014 +0100
----------------------------------------------------------------------
.../api/common/io/CleanupWhenUnsuccessful.java | 32 +++++++++++++++++
.../flink/api/common/io/FileOutputFormat.java | 37 +++++++++++++++-----
.../flink/runtime/operators/DataSinkTask.java | 26 ++++++++++++++
.../runtime/operators/DataSinkTaskTest.java | 7 ++--
4 files changed, 90 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java b/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
new file mode 100644
index 0000000..4b912e1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+/**
+ * {@link OutputFormat}s may implement this interface to run a cleanup hook when the execution is not successful.
+ */
+public interface CleanupWhenUnsuccessful {
+
+ /**
+ * Hook that is called upon an unsuccessful execution.
+ *
+ * @throws Exception The method may forward exceptions when the cleanup fails.
+ */
+ void tryCleanupOnError() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index a9beddb..bc7ab73 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.common.io;
+import java.io.FileNotFoundException;
import java.io.IOException;
import org.slf4j.Logger;
@@ -35,7 +36,7 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
* The abstract base class for all output formats that are file based. Contains the logic to open/close the target
* file streams.
*/
-public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster {
+public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster, CleanupWhenUnsuccessful {
private static final long serialVersionUID = 1L;
@@ -106,10 +107,14 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
// --------------------------------------------------------------------------------------------
- /**
- * The stream to which the data is written;
- */
+ /** The stream to which the data is written; */
protected transient FSDataOutputStream stream;
+
+ /** The path that is actually written to (may a a file in a the directory defined by {@code outputFilePath} ) */
+ private transient Path actualFilePath;
+
+ /** Flag indicating whether this format actually created a file, which should be removed on cleanup. */
+ private transient boolean fileCreated;
// --------------------------------------------------------------------------------------------
@@ -231,12 +236,13 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
// Suffix the path with the parallel instance index, if needed
- if (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) {
- p = p.suffix("/" + (taskNumber+1));
- }
+ this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + (taskNumber+1)) : p;
// create output file
- this.stream = fs.create(p, writeMode == WriteMode.OVERWRITE);
+ this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE);
+
+ // at this point, the file creation must have succeeded, or an exception has been thrown
+ this.fileCreated = true;
}
@Override
@@ -283,6 +289,21 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
}
}
+ @Override
+ public void tryCleanupOnError() {
+ if (this.fileCreated) {
+ this.fileCreated = false;
+
+ try {
+ FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
+ } catch (FileNotFoundException e) {
+ // ignore, may not be visible yet or may be already removed
+ } catch (Throwable t) {
+ LOG.error("Could not remove the incomplete file " + actualFilePath);
+ }
+ }
+ }
+
// ============================================================================================
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 1a378b2..b1185c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -78,6 +79,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
// cancel flag
private volatile boolean taskCanceled;
+ private volatile boolean cleanupCalled;
+
@Override
public void registerInputOutput() {
@@ -180,6 +183,18 @@ public class DataSinkTask<IT> extends AbstractInvokable {
}
}
catch (Exception ex) {
+
+ // make a best effort to clean up
+ try {
+ if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+ cleanupCalled = true;
+ ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+ }
+ }
+ catch (Throwable t) {
+ LOG.error("Cleanup on error failed.", t);
+ }
+
ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
if (ex instanceof CancelTaskException) {
@@ -237,6 +252,17 @@ public class DataSinkTask<IT> extends AbstractInvokable {
try {
this.format.close();
} catch (Throwable t) {}
+
+ // make a best effort to clean up
+ try {
+ if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+ cleanupCalled = true;
+ ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+ }
+ }
+ catch (Throwable t) {
+ LOG.error("Cleanup on error failed.", t);
+ }
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index f5381e4..3219a21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -306,7 +306,7 @@ public class DataSinkTaskTest extends TaskTestBase
// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
- Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
+ Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
}
@@ -347,7 +347,7 @@ public class DataSinkTaskTest extends TaskTestBase
// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
- Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
+ Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
}
@@ -388,8 +388,7 @@ public class DataSinkTaskTest extends TaskTestBase
// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
- Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
-
+ Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
}
@Test