You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ja...@apache.org on 2022/10/26 10:03:10 UTC
[incubator-linkis] branch dev-1.3.1 updated: Fix writer the file is emptied by calling flush after calling the close method close #3709
This is an automated email from the ASF dual-hosted git repository.
jackxu2011 pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 487482086 Fix writer the file is emptied by calling flush after calling the close method close #3709
new 5d49bf9e5 Merge pull request #3710 from WeDataSphere/dev-1.3.1-storage-fix
487482086 is described below
commit 4874820868664f62fb04ef9c6c6152eb0936d4f1
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon Oct 24 20:46:38 2022 +0800
Fix writer the file is emptied by calling flush after calling the close method close #3709
---
.../storage/resultset/StorageResultSetWriter.scala | 50 +++++++++++++++++++---
1 file changed, 44 insertions(+), 6 deletions(-)
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
index 9d56adc45..9c7947272 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
@@ -59,6 +59,14 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
private var proxyUser: String = StorageUtils.getJvmUser
+ private var fileCreated = false
+
+ private var closed = false
+
+ private val WRITER_LOCK_CREATE = new Object()
+
+ private val WRITER_LOCK_CLOSE = new Object()
+
def getMetaData: MetaData = rMetaData
def setProxyUser(proxyUser: String): Unit = {
@@ -74,16 +82,29 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
}
def createNewFile: Unit = {
- if (storePath != null && outputStream == null) {
- fs = FSFactory.getFsByProxyUser(storePath, proxyUser)
- fs.init(null)
- FileSystemUtils.createNewFile(storePath, proxyUser, true)
- outputStream = fs.write(storePath, true)
- logger.info(s"Succeed to create a new file:$storePath")
+ if (!fileCreated) {
+ WRITER_LOCK_CREATE.synchronized {
+ if (!fileCreated) {
+ if (storePath != null && outputStream == null) {
+ fs = FSFactory.getFsByProxyUser(storePath, proxyUser)
+ fs.init(null)
+ FileSystemUtils.createNewFile(storePath, proxyUser, true)
+ outputStream = fs.write(storePath, true)
+ logger.info(s"Succeed to create a new file:$storePath")
+ fileCreated = true
+ }
+ }
+ }
+ } else if (null != storePath && null == outputStream) {
+ logger.warn("outputStream had been set null, but createNewFile() was called again.")
}
}
def writeLine(bytes: Array[Byte], cache: Boolean = false): Unit = {
+ if (closed) {
+ logger.warn("the writer had been closed, but writeLine() was still called.")
+ return
+ }
if (bytes.length > LinkisStorageConf.ROW_BYTE_MAX_LEN) {
throw new IOException(
s"A single row of data cannot exceed ${LinkisStorageConf.ROW_BYTE_MAX_LEN_STR}"
@@ -153,6 +174,18 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
}
override def close(): Unit = {
+ if (closed) {
+ logger.warn("the writer had been closed, but close() was still called.")
+ return
+ } else {
+ WRITER_LOCK_CLOSE.synchronized {
+ if (!closed) {
+ closed = true
+ } else {
+ return
+ }
+ }
+ }
Utils.tryFinally(if (outputStream != null) flush()) {
closeFs
if (outputStream != null) {
@@ -178,6 +211,11 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](
}
}(s"Error encounters when flush result set ")
}
+ if (closed) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("the writer had been closed, but flush() was still called.")
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org