You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ja...@apache.org on 2022/10/26 10:03:10 UTC

[incubator-linkis] branch dev-1.3.1 updated: Fix writer the file is emptied by calling flush after calling the close method close #3709

This is an automated email from the ASF dual-hosted git repository.

jackxu2011 pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new 487482086 Fix writer the file is emptied by calling flush after calling the close method close #3709
     new 5d49bf9e5 Merge pull request #3710 from WeDataSphere/dev-1.3.1-storage-fix
487482086 is described below

commit 4874820868664f62fb04ef9c6c6152eb0936d4f1
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon Oct 24 20:46:38 2022 +0800

    Fix writer the file is emptied by calling flush after calling the close method close #3709
---
 .../storage/resultset/StorageResultSetWriter.scala | 50 +++++++++++++++++++---
 1 file changed, 44 insertions(+), 6 deletions(-)

diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
index 9d56adc45..9c7947272 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
@@ -59,6 +59,14 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
 
   private var proxyUser: String = StorageUtils.getJvmUser
 
+  private var fileCreated = false
+
+  private var closed = false
+
+  private val WRITER_LOCK_CREATE = new Object()
+
+  private val WRITER_LOCK_CLOSE = new Object()
+
   def getMetaData: MetaData = rMetaData
 
   def setProxyUser(proxyUser: String): Unit = {
@@ -74,16 +82,29 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
   }
 
   def createNewFile: Unit = {
-    if (storePath != null && outputStream == null) {
-      fs = FSFactory.getFsByProxyUser(storePath, proxyUser)
-      fs.init(null)
-      FileSystemUtils.createNewFile(storePath, proxyUser, true)
-      outputStream = fs.write(storePath, true)
-      logger.info(s"Succeed to create a new file:$storePath")
+    if (!fileCreated) {
+      WRITER_LOCK_CREATE.synchronized {
+        if (!fileCreated) {
+          if (storePath != null && outputStream == null) {
+            fs = FSFactory.getFsByProxyUser(storePath, proxyUser)
+            fs.init(null)
+            FileSystemUtils.createNewFile(storePath, proxyUser, true)
+            outputStream = fs.write(storePath, true)
+            logger.info(s"Succeed to create a new file:$storePath")
+            fileCreated = true
+          }
+        }
+      }
+    } else if (null != storePath && null == outputStream) {
+      logger.warn("outputStream had been set null, but createNewFile() was called again.")
     }
   }
 
   def writeLine(bytes: Array[Byte], cache: Boolean = false): Unit = {
+    if (closed) {
+      logger.warn("the writer had been closed, but writeLine() was still called.")
+      return
+    }
     if (bytes.length > LinkisStorageConf.ROW_BYTE_MAX_LEN) {
       throw new IOException(
         s"A single row of data cannot exceed ${LinkisStorageConf.ROW_BYTE_MAX_LEN_STR}"
@@ -153,6 +174,18 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
   }
 
   override def close(): Unit = {
+    if (closed) {
+      logger.warn("the writer had been closed, but close() was still called.")
+      return
+    } else {
+      WRITER_LOCK_CLOSE.synchronized {
+        if (!closed) {
+          closed = true
+        } else {
+          return
+        }
+      }
+    }
     Utils.tryFinally(if (outputStream != null) flush()) {
       closeFs
       if (outputStream != null) {
@@ -178,6 +211,11 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
         }
       }(s"Error encounters when flush result set ")
     }
+    if (closed) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("the writer had been closed, but flush() was still called.")
+      }
+    }
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org