You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/03/24 09:49:04 UTC
[incubator-linkis] branch dev-1.1.1 updated: Limit the size of data written to a single row of the result set #1793 (#1794)
This is an automated email from the ASF dual-hosted git repository.
alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
new 75b4936 Limit the size of data written to a single row of the result set #1793 (#1794)
75b4936 is described below
commit 75b49367f0329b6d5429f1391084a83d57ea5460
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Mar 24 17:48:58 2022 +0800
Limit the size of data written to a single row of the result set #1793 (#1794)
---
.../linkis/storage/conf/LinkisStorageConf.scala | 8 +++++-
.../storage/resultset/StorageResultSetWriter.scala | 29 ++++++++++++----------
2 files changed, 23 insertions(+), 14 deletions(-)
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
index c81542b..87a8fc2 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
@@ -17,7 +17,8 @@
package org.apache.linkis.storage.conf
-import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.{ByteType, CommonVars}
+import org.apache.linkis.common.utils.ByteTimeUtils
object LinkisStorageConf {
val HDFS_FILE_SYSTEM_REST_ERRS: String =
@@ -25,4 +26,9 @@ object LinkisStorageConf {
"wds.linkis.hdfs.rest.errs",
".*Filesystem closed.*|.*Failed to find any Kerberos tgt.*")
.getValue
+
+ val ROW_BYTE_MAX_LEN_STR = CommonVars("wds.linkis.resultset.row.max.str", "10m").getValue
+
+ val ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR)
+
}
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
index 8277415..6cc3c09 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
@@ -18,19 +18,19 @@
package org.apache.linkis.storage.resultset
import java.io.{IOException, OutputStream}
-
import org.apache.linkis.common.io.resultset.{ResultSerializer, ResultSet, ResultSetWriter}
import org.apache.linkis.common.io.{Fs, FsPath, MetaData, Record}
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.storage.FSFactory
+import org.apache.linkis.storage.conf.LinkisStorageConf
import org.apache.linkis.storage.domain.Dolphin
import org.apache.linkis.storage.utils.{FileSystemUtils, StorageUtils}
import scala.collection.mutable.ArrayBuffer
-class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,V], maxCacheSize: Long,
- storePath: FsPath) extends ResultSetWriter[K,V](resultSet = resultSet, maxCacheSize = maxCacheSize, storePath = storePath) with Logging{
+class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K, V], maxCacheSize: Long,
+ storePath: FsPath) extends ResultSetWriter[K, V](resultSet = resultSet, maxCacheSize = maxCacheSize, storePath = storePath) with Logging{
@@ -77,6 +77,9 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
}
def writeLine(bytes: Array[Byte], cache: Boolean = false): Unit = {
+ if (bytes.length > LinkisStorageConf.ROW_BYTE_MAX_LEN) {
+ throw new IOException(s"A single row of data cannot exceed ${LinkisStorageConf.ROW_BYTE_MAX_LEN_STR}")
+ }
if (buffer.length > maxCacheSize && !cache) {
if (outputStream == null) {
createNewFile
@@ -89,9 +92,9 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
}
override def toString: String = {
- if(outputStream == null){
- if(isEmpty) return ""
- new String(buffer.toArray,Dolphin.CHAR_SET)
+ if (outputStream == null) {
+ if (isEmpty) return ""
+ new String(buffer.toArray, Dolphin.CHAR_SET)
} else {
storePath.getSchemaPath
}
@@ -100,7 +103,7 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
override def toFSPath: FsPath = storePath
override def addMetaDataAndRecordString(content: String): Unit = {
- if(!moveToWriteRow){
+ if (!moveToWriteRow) {
val bytes = content.getBytes(Dolphin.CHAR_SET)
writeLine(bytes)
}
@@ -111,7 +114,7 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
@scala.throws[IOException]
override def addMetaData(metaData: MetaData): Unit = {
- if(!moveToWriteRow) {
+ if (!moveToWriteRow) {
{
rMetaData = metaData
init()
@@ -127,20 +130,20 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
@scala.throws[IOException]
override def addRecord(record: Record): Unit = {
- if(moveToWriteRow) {
+ if (moveToWriteRow) {
rowCount = rowCount + 1
writeLine(serializer.recordToBytes(record))
}
}
def closeFs: Unit = {
- if(fs != null)
+ if (fs != null)
fs.close()
}
override def close(): Unit = {
- Utils.tryFinally(if(outputStream != null ) flush()){
+ Utils.tryFinally(if (outputStream != null ) flush()){
closeFs
- if(outputStream != null){
+ if (outputStream != null) {
outputStream.close()
}}
}
@@ -148,7 +151,7 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
override def flush(): Unit = {
createNewFile
if(outputStream != null) {
- if(buffer.nonEmpty) {
+ if (buffer.nonEmpty) {
outputStream.write(buffer.toArray)
buffer.clear()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org