You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/15 01:54:00 UTC
[flink] 02/02: [FLINK-18265][fs-connector] temp path in
FileSystemOutputFormat should be deleted
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3916da093fadd25bf54417f3be5e91f4720d90e5
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Jun 12 14:31:06 2020 +0800
[FLINK-18265][fs-connector] temp path in FileSystemOutputFormat should be deleted
This closes #12628
---
.../org/apache/flink/table/filesystem/FileSystemOutputFormat.java | 3 +++
.../apache/flink/table/filesystem/FileSystemOutputFormatTest.java | 5 +++++
2 files changed, 8 insertions(+)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
index 56011ab..0f35107 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.table.api.TableException;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedHashMap;
@@ -94,6 +95,8 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
committer.commitUpToCheckpoint(CHECKPOINT_ID);
} catch (Exception e) {
throw new TableException("Exception in finalizeGlobal", e);
+ } finally {
+ new File(tmpPath.getPath()).delete();
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemOutputFormatTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemOutputFormatTest.java
index 80118ff..520d56d 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemOutputFormatTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemOutputFormatTest.java
@@ -42,6 +42,7 @@ import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
/**
* Test for {@link FileSystemOutputFormat}.
@@ -128,6 +129,7 @@ public class FileSystemOutputFormatTest {
assertEquals(
"a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n",
content.values().iterator().next());
+ assertFalse(new File(tmpFile.toURI()).exists());
}
@Test
@@ -154,6 +156,7 @@ public class FileSystemOutputFormatTest {
assertEquals(
"a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n",
content.values().iterator().next());
+ assertFalse(new File(tmpFile.toURI()).exists());
}
@Test
@@ -173,6 +176,7 @@ public class FileSystemOutputFormatTest {
assertEquals(2, sortedContent.size());
assertEquals("a1,1\n" + "a2,2\n" + "a3,3\n", sortedContent.get("c=p1"));
assertEquals("a2,2\n", sortedContent.get("c=p2"));
+ assertFalse(new File(tmpFile.toURI()).exists());
}
@Test
@@ -198,6 +202,7 @@ public class FileSystemOutputFormatTest {
assertEquals(2, sortedContent.size());
assertEquals("a1,1\n" + "a2,2\n" + "a3,3\n", sortedContent.get("c=p1"));
assertEquals("a2,2\n", sortedContent.get("c=p2"));
+ assertFalse(new File(tmpFile.toURI()).exists());
}
private OneInputStreamOperatorTestHarness<Row, Object> createSink(