You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/12/17 05:38:16 UTC

[flink] branch release-1.13 updated: [FLINK-24728][table-runtime-blink] Close output stream in batch SQL file sink

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 3200e8e  [FLINK-24728][table-runtime-blink] Close output stream in batch SQL file sink
3200e8e is described below

commit 3200e8ef43b3024b0b44f184dfa833d1aa7d7d75
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Dec 17 13:37:47 2021 +0800

    [FLINK-24728][table-runtime-blink] Close output stream in batch SQL file sink
    
    This closes #18074
---
 .../org/apache/flink/testutils/TestFileSystem.java | 77 ++++++++++++++++++++--
 .../planner/runtime/FileSystemITCaseBase.scala     | 14 ++--
 .../batch/sql/FileSystemTestCsvITCase.scala        | 17 +++++
 .../stream/sql/StreamFileSystemTestCsvITCase.scala | 17 +++++
 .../table/filesystem/FileSystemTableSink.java      |  7 +-
 5 files changed, 118 insertions(+), 14 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
index 8a343f0..f04837a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
@@ -19,15 +19,21 @@
 package org.apache.flink.testutils;
 
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
 import org.apache.flink.core.fs.local.LocalFileStatus;
 import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A test file system. This also has a service entry in the test resources, to be loaded during
@@ -37,29 +43,45 @@ public class TestFileSystem extends LocalFileSystem {
 
     public static final String SCHEME = "test";
 
-    private static int streamOpenCounter;
+    // number of (input) stream opened
+    private static final AtomicInteger streamOpenCounter = new AtomicInteger(0);
+
+    // current number of created, unclosed (output) stream
+    private static final Map<Path, Integer> currentUnclosedOutputStream = new ConcurrentHashMap<>();
 
     public static int getNumtimeStreamOpened() {
-        return streamOpenCounter;
+        return streamOpenCounter.get();
     }
 
     public static void resetStreamOpenCounter() {
-        streamOpenCounter = 0;
+        streamOpenCounter.set(0);
+    }
+
+    public static int getNumberOfUnclosedOutputStream(Path path) {
+        return currentUnclosedOutputStream.getOrDefault(path, 0);
     }
 
     @Override
     public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-        streamOpenCounter++;
+        streamOpenCounter.incrementAndGet();
         return super.open(f, bufferSize);
     }
 
     @Override
     public FSDataInputStream open(Path f) throws IOException {
-        streamOpenCounter++;
+        streamOpenCounter.incrementAndGet();
         return super.open(f);
     }
 
     @Override
+    public FSDataOutputStream create(final Path filePath, final WriteMode overwrite)
+            throws IOException {
+        currentUnclosedOutputStream.compute(filePath, (k, v) -> v == null ? 1 : v + 1);
+        LocalDataOutputStream stream = (LocalDataOutputStream) super.create(filePath, overwrite);
+        return new TestOutputStream(stream, filePath);
+    }
+
+    @Override
     public FileStatus getFileStatus(Path f) throws IOException {
         LocalFileStatus status = (LocalFileStatus) super.getFileStatus(f);
         return new LocalFileStatus(status.getFile(), this);
@@ -82,6 +104,51 @@ public class TestFileSystem extends LocalFileSystem {
 
     // ------------------------------------------------------------------------
 
+    private static final class TestOutputStream extends FSDataOutputStream {
+
+        private final LocalDataOutputStream stream;
+        private final Path path;
+
+        private TestOutputStream(LocalDataOutputStream stream, Path path) {
+            this.stream = stream;
+            this.path = path;
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return stream.getPos();
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            stream.write(b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            stream.write(b, off, len);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            stream.flush();
+        }
+
+        @Override
+        public void sync() throws IOException {
+            stream.sync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            currentUnclosedOutputStream.compute(
+                    path, (k, v) -> Preconditions.checkNotNull(v) == 1 ? null : v - 1);
+            stream.close();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
     public static final class TestFileSystemFactory implements FileSystemFactory {
 
         @Override
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index d80e5b4..32a08d3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -49,6 +49,8 @@ trait FileSystemITCaseBase {
 
   def formatProperties(): Array[String] = Array()
 
+  def getScheme: String = "file"
+
   def tableEnv: TableEnvironment
 
   def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
@@ -59,7 +61,7 @@ trait FileSystemITCaseBase {
   }
 
   def open(): Unit = {
-    resultPath = fileTmpFolder.newFolder().toURI.toString
+    resultPath = fileTmpFolder.newFolder().toURI.getPath
     BatchTableEnvUtil.registerCollection(
       tableEnv,
       "originalT",
@@ -76,7 +78,7 @@ trait FileSystemITCaseBase {
          |  c as b + 1
          |) partitioned by (a, b) with (
          |  'connector' = 'filesystem',
-         |  'path' = '$resultPath',
+         |  'path' = '$getScheme://$resultPath',
          |  ${formatProperties().mkString(",\n")}
          |)
        """.stripMargin
@@ -90,7 +92,7 @@ trait FileSystemITCaseBase {
          |  b bigint
          |) with (
          |  'connector' = 'filesystem',
-         |  'path' = '$resultPath',
+         |  'path' = '$getScheme://$resultPath',
          |  ${formatProperties().mkString(",\n")}
          |)
        """.stripMargin
@@ -102,7 +104,7 @@ trait FileSystemITCaseBase {
          |  x decimal(10, 0), y int
          |) with (
          |  'connector' = 'filesystem',
-         |  'path' = '$resultPath',
+         |  'path' = '$getScheme://$resultPath',
          |  ${formatProperties().mkString(",\n")}
          |)
        """.stripMargin
@@ -114,7 +116,7 @@ trait FileSystemITCaseBase {
          |  x decimal(3, 2), y int
          |) with (
          |  'connector' = 'filesystem',
-         |  'path' = '$resultPath',
+         |  'path' = '$getScheme://$resultPath',
          |  ${formatProperties().mkString(",\n")}
          |)
        """.stripMargin
@@ -256,7 +258,7 @@ trait FileSystemITCaseBase {
       "partition(a='1', b='1') select x, y from originalT where a=1 and b=1").await()
 
     // create hidden partition dir
-    assertTrue(new File(new Path(resultPath + "/a=1/.b=2").toUri).mkdir())
+    assertTrue(new File(new Path("file:" + resultPath + "/a=1/.b=2").toUri).mkdir())
 
     check(
       "select x, y from partitionedTable",
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
index baf2cb7..0d768b6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql
 
+import org.apache.flink.core.fs.Path
+import org.apache.flink.testutils.TestFileSystem
+
+import org.junit.After
+import org.junit.Assert.assertEquals
+
 /**
   * Test for file system table factory with testcsv format.
   */
@@ -26,4 +32,15 @@ class FileSystemTestCsvITCase extends BatchFileSystemITCaseBase {
   override def formatProperties(): Array[String] = {
     super.formatProperties() ++ Seq("'format' = 'testcsv'")
   }
+
+  override def getScheme: String = "test"
+
+  @After
+  def close(): Unit = {
+    val path = new Path(resultPath)
+    assertEquals(
+      s"File $resultPath is not closed",
+      0,
+      TestFileSystem.getNumberOfUnclosedOutputStream(path))
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
index 4ead229..a9a96c1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
+import org.apache.flink.core.fs.Path
+import org.apache.flink.testutils.TestFileSystem
+
+import org.junit.After
+import org.junit.Assert.assertEquals
+
 import scala.collection.Seq
 
 /**
@@ -28,4 +34,15 @@ class StreamFileSystemTestCsvITCase extends StreamFileSystemITCaseBase {
   override def formatProperties(): Array[String] = {
     super.formatProperties() ++ Seq("'format' = 'testcsv'")
   }
+
+  override def getScheme: String = "test"
+
+  @After
+  def close(): Unit = {
+    val path = new Path(resultPath)
+    assertEquals(
+      s"File $resultPath is not closed",
+      0,
+      TestFileSystem.getNumberOfUnclosedOutputStream(path))
+  }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 31865d8..5395529 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -416,15 +416,15 @@ public class FileSystemTableSink extends AbstractFileSystemTable
             private static final long serialVersionUID = 1L;
 
             private transient BulkWriter<RowData> writer;
+            private transient FSDataOutputStream stream;
 
             @Override
             public void configure(Configuration parameters) {}
 
             @Override
             public void open(int taskNumber, int numTasks) throws IOException {
-                this.writer =
-                        factory.create(
-                                path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+                this.stream = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
+                this.writer = factory.create(stream);
             }
 
             @Override
@@ -436,6 +436,7 @@ public class FileSystemTableSink extends AbstractFileSystemTable
             public void close() throws IOException {
                 writer.flush();
                 writer.finish();
+                stream.close();
             }
         };
     }