You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/12/17 05:38:16 UTC
[flink] branch release-1.13 updated: [FLINK-24728][table-runtime-blink] Close output stream in batch SQL file sink
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 3200e8e [FLINK-24728][table-runtime-blink] Close output stream in batch SQL file sink
3200e8e is described below
commit 3200e8ef43b3024b0b44f184dfa833d1aa7d7d75
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Dec 17 13:37:47 2021 +0800
[FLINK-24728][table-runtime-blink] Close output stream in batch SQL file sink
This closes #18074
---
.../org/apache/flink/testutils/TestFileSystem.java | 77 ++++++++++++++++++++--
.../planner/runtime/FileSystemITCaseBase.scala | 14 ++--
.../batch/sql/FileSystemTestCsvITCase.scala | 17 +++++
.../stream/sql/StreamFileSystemTestCsvITCase.scala | 17 +++++
.../table/filesystem/FileSystemTableSink.java | 7 +-
5 files changed, 118 insertions(+), 14 deletions(-)
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
index 8a343f0..f04837a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
@@ -19,15 +19,21 @@
package org.apache.flink.testutils;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
import org.apache.flink.core.fs.local.LocalFileStatus;
import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A test file system. This also has a service entry in the test resources, to be loaded during
@@ -37,29 +43,45 @@ public class TestFileSystem extends LocalFileSystem {
public static final String SCHEME = "test";
- private static int streamOpenCounter;
+ // number of (input) stream opened
+ private static final AtomicInteger streamOpenCounter = new AtomicInteger(0);
+
+ // current number of created, unclosed (output) stream
+ private static final Map<Path, Integer> currentUnclosedOutputStream = new ConcurrentHashMap<>();
public static int getNumtimeStreamOpened() {
- return streamOpenCounter;
+ return streamOpenCounter.get();
}
public static void resetStreamOpenCounter() {
- streamOpenCounter = 0;
+ streamOpenCounter.set(0);
+ }
+
+ public static int getNumberOfUnclosedOutputStream(Path path) {
+ return currentUnclosedOutputStream.getOrDefault(path, 0);
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
- streamOpenCounter++;
+ streamOpenCounter.incrementAndGet();
return super.open(f, bufferSize);
}
@Override
public FSDataInputStream open(Path f) throws IOException {
- streamOpenCounter++;
+ streamOpenCounter.incrementAndGet();
return super.open(f);
}
@Override
+ public FSDataOutputStream create(final Path filePath, final WriteMode overwrite)
+ throws IOException {
+ currentUnclosedOutputStream.compute(filePath, (k, v) -> v == null ? 1 : v + 1);
+ LocalDataOutputStream stream = (LocalDataOutputStream) super.create(filePath, overwrite);
+ return new TestOutputStream(stream, filePath);
+ }
+
+ @Override
public FileStatus getFileStatus(Path f) throws IOException {
LocalFileStatus status = (LocalFileStatus) super.getFileStatus(f);
return new LocalFileStatus(status.getFile(), this);
@@ -82,6 +104,51 @@ public class TestFileSystem extends LocalFileSystem {
// ------------------------------------------------------------------------
+ private static final class TestOutputStream extends FSDataOutputStream {
+
+ private final LocalDataOutputStream stream;
+ private final Path path;
+
+ private TestOutputStream(LocalDataOutputStream stream, Path path) {
+ this.stream = stream;
+ this.path = path;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ stream.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ stream.sync();
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentUnclosedOutputStream.compute(
+ path, (k, v) -> Preconditions.checkNotNull(v) == 1 ? null : v - 1);
+ stream.close();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
public static final class TestFileSystemFactory implements FileSystemFactory {
@Override
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index d80e5b4..32a08d3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -49,6 +49,8 @@ trait FileSystemITCaseBase {
def formatProperties(): Array[String] = Array()
+ def getScheme: String = "file"
+
def tableEnv: TableEnvironment
def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
@@ -59,7 +61,7 @@ trait FileSystemITCaseBase {
}
def open(): Unit = {
- resultPath = fileTmpFolder.newFolder().toURI.toString
+ resultPath = fileTmpFolder.newFolder().toURI.getPath
BatchTableEnvUtil.registerCollection(
tableEnv,
"originalT",
@@ -76,7 +78,7 @@ trait FileSystemITCaseBase {
| c as b + 1
|) partitioned by (a, b) with (
| 'connector' = 'filesystem',
- | 'path' = '$resultPath',
+ | 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
@@ -90,7 +92,7 @@ trait FileSystemITCaseBase {
| b bigint
|) with (
| 'connector' = 'filesystem',
- | 'path' = '$resultPath',
+ | 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
@@ -102,7 +104,7 @@ trait FileSystemITCaseBase {
| x decimal(10, 0), y int
|) with (
| 'connector' = 'filesystem',
- | 'path' = '$resultPath',
+ | 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
@@ -114,7 +116,7 @@ trait FileSystemITCaseBase {
| x decimal(3, 2), y int
|) with (
| 'connector' = 'filesystem',
- | 'path' = '$resultPath',
+ | 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
@@ -256,7 +258,7 @@ trait FileSystemITCaseBase {
"partition(a='1', b='1') select x, y from originalT where a=1 and b=1").await()
// create hidden partition dir
- assertTrue(new File(new Path(resultPath + "/a=1/.b=2").toUri).mkdir())
+ assertTrue(new File(new Path("file:" + resultPath + "/a=1/.b=2").toUri).mkdir())
check(
"select x, y from partitionedTable",
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
index baf2cb7..0d768b6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
@@ -18,6 +18,12 @@
package org.apache.flink.table.planner.runtime.batch.sql
+import org.apache.flink.core.fs.Path
+import org.apache.flink.testutils.TestFileSystem
+
+import org.junit.After
+import org.junit.Assert.assertEquals
+
/**
* Test for file system table factory with testcsv format.
*/
@@ -26,4 +32,15 @@ class FileSystemTestCsvITCase extends BatchFileSystemITCaseBase {
override def formatProperties(): Array[String] = {
super.formatProperties() ++ Seq("'format' = 'testcsv'")
}
+
+ override def getScheme: String = "test"
+
+ @After
+ def close(): Unit = {
+ val path = new Path(resultPath)
+ assertEquals(
+ s"File $resultPath is not closed",
+ 0,
+ TestFileSystem.getNumberOfUnclosedOutputStream(path))
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
index 4ead229..a9a96c1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
@@ -18,6 +18,12 @@
package org.apache.flink.table.planner.runtime.stream.sql
+import org.apache.flink.core.fs.Path
+import org.apache.flink.testutils.TestFileSystem
+
+import org.junit.After
+import org.junit.Assert.assertEquals
+
import scala.collection.Seq
/**
@@ -28,4 +34,15 @@ class StreamFileSystemTestCsvITCase extends StreamFileSystemITCaseBase {
override def formatProperties(): Array[String] = {
super.formatProperties() ++ Seq("'format' = 'testcsv'")
}
+
+ override def getScheme: String = "test"
+
+ @After
+ def close(): Unit = {
+ val path = new Path(resultPath)
+ assertEquals(
+ s"File $resultPath is not closed",
+ 0,
+ TestFileSystem.getNumberOfUnclosedOutputStream(path))
+ }
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 31865d8..5395529 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -416,15 +416,15 @@ public class FileSystemTableSink extends AbstractFileSystemTable
private static final long serialVersionUID = 1L;
private transient BulkWriter<RowData> writer;
+ private transient FSDataOutputStream stream;
@Override
public void configure(Configuration parameters) {}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- this.writer =
- factory.create(
- path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+ this.stream = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
+ this.writer = factory.create(stream);
}
@Override
@@ -436,6 +436,7 @@ public class FileSystemTableSink extends AbstractFileSystemTable
public void close() throws IOException {
writer.flush();
writer.finish();
+ stream.close();
}
};
}