You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/15 01:54:00 UTC

[flink] 02/02: [FLINK-18265][fs-connector] temp path in FileSystemOutputFormat should be deleted

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3916da093fadd25bf54417f3be5e91f4720d90e5
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Jun 12 14:31:06 2020 +0800

    [FLINK-18265][fs-connector] temp path in FileSystemOutputFormat should be deleted
    
    This closes #12628
---
 .../org/apache/flink/table/filesystem/FileSystemOutputFormat.java    | 3 +++
 .../apache/flink/table/filesystem/FileSystemOutputFormatTest.java    | 5 +++++
 2 files changed, 8 insertions(+)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
index 56011ab..0f35107 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import org.apache.flink.table.api.TableException;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.LinkedHashMap;
@@ -94,6 +95,8 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
 			committer.commitUpToCheckpoint(CHECKPOINT_ID);
 		} catch (Exception e) {
 			throw new TableException("Exception in finalizeGlobal", e);
+		} finally {
+			new File(tmpPath.getPath()).delete();
 		}
 	}
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemOutputFormatTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemOutputFormatTest.java
index 80118ff..520d56d 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemOutputFormatTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemOutputFormatTest.java
@@ -42,6 +42,7 @@ import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 /**
  * Test for {@link FileSystemOutputFormat}.
@@ -128,6 +129,7 @@ public class FileSystemOutputFormatTest {
 		assertEquals(
 				"a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n",
 				content.values().iterator().next());
+		assertFalse(new File(tmpFile.toURI()).exists());
 	}
 
 	@Test
@@ -154,6 +156,7 @@ public class FileSystemOutputFormatTest {
 		assertEquals(
 				"a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n",
 				content.values().iterator().next());
+		assertFalse(new File(tmpFile.toURI()).exists());
 	}
 
 	@Test
@@ -173,6 +176,7 @@ public class FileSystemOutputFormatTest {
 		assertEquals(2, sortedContent.size());
 		assertEquals("a1,1\n" + "a2,2\n" + "a3,3\n", sortedContent.get("c=p1"));
 		assertEquals("a2,2\n", sortedContent.get("c=p2"));
+		assertFalse(new File(tmpFile.toURI()).exists());
 	}
 
 	@Test
@@ -198,6 +202,7 @@ public class FileSystemOutputFormatTest {
 		assertEquals(2, sortedContent.size());
 		assertEquals("a1,1\n" + "a2,2\n" + "a3,3\n", sortedContent.get("c=p1"));
 		assertEquals("a2,2\n", sortedContent.get("c=p2"));
+		assertFalse(new File(tmpFile.toURI()).exists());
 	}
 
 	private OneInputStreamOperatorTestHarness<Row, Object> createSink(