You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/03/24 09:49:04 UTC

[incubator-linkis] branch dev-1.1.1 updated: Limit the size of data written to a single row of the result set #1793 (#1794)

This is an automated email from the ASF dual-hosted git repository.

alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
     new 75b4936  Limit the size of data written to a single row of the result set #1793 (#1794)
75b4936 is described below

commit 75b49367f0329b6d5429f1391084a83d57ea5460
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Mar 24 17:48:58 2022 +0800

    Limit the size of data written to a single row of the result set #1793 (#1794)
---
 .../linkis/storage/conf/LinkisStorageConf.scala    |  8 +++++-
 .../storage/resultset/StorageResultSetWriter.scala | 29 ++++++++++++----------
 2 files changed, 23 insertions(+), 14 deletions(-)

diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
index c81542b..87a8fc2 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
@@ -17,7 +17,8 @@
 
 package org.apache.linkis.storage.conf
 
-import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.{ByteType, CommonVars}
+import org.apache.linkis.common.utils.ByteTimeUtils
 
 object LinkisStorageConf {
   val HDFS_FILE_SYSTEM_REST_ERRS: String =
@@ -25,4 +26,9 @@ object LinkisStorageConf {
       "wds.linkis.hdfs.rest.errs",
       ".*Filesystem closed.*|.*Failed to find any Kerberos tgt.*")
       .getValue
+
+  val ROW_BYTE_MAX_LEN_STR = CommonVars("wds.linkis.resultset.row.max.str", "10m").getValue
+
+  val ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR)
+
 }
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
index 8277415..6cc3c09 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetWriter.scala
@@ -18,19 +18,19 @@
 package org.apache.linkis.storage.resultset
 
 import java.io.{IOException, OutputStream}
-
 import org.apache.linkis.common.io.resultset.{ResultSerializer, ResultSet, ResultSetWriter}
 import org.apache.linkis.common.io.{Fs, FsPath, MetaData, Record}
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.storage.FSFactory
+import org.apache.linkis.storage.conf.LinkisStorageConf
 import org.apache.linkis.storage.domain.Dolphin
 import org.apache.linkis.storage.utils.{FileSystemUtils, StorageUtils}
 
 import scala.collection.mutable.ArrayBuffer
 
 
-class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,V], maxCacheSize: Long,
-                           storePath: FsPath) extends ResultSetWriter[K,V](resultSet = resultSet, maxCacheSize = maxCacheSize, storePath = storePath) with Logging{
+class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K, V], maxCacheSize: Long,
+                           storePath: FsPath) extends ResultSetWriter[K, V](resultSet = resultSet, maxCacheSize = maxCacheSize, storePath = storePath) with Logging{
 
 
 
@@ -77,6 +77,9 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
   }
 
   def writeLine(bytes: Array[Byte], cache: Boolean = false): Unit = {
+    if (bytes.length > LinkisStorageConf.ROW_BYTE_MAX_LEN) {
+      throw new IOException(s"A single row of data cannot exceed ${LinkisStorageConf.ROW_BYTE_MAX_LEN_STR}")
+    }
     if (buffer.length > maxCacheSize && !cache) {
       if (outputStream == null) {
         createNewFile
@@ -89,9 +92,9 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
   }
 
   override def toString: String = {
-   if(outputStream == null){
-     if(isEmpty) return ""
-      new String(buffer.toArray,Dolphin.CHAR_SET)
+   if (outputStream == null) {
+     if (isEmpty) return ""
+      new String(buffer.toArray, Dolphin.CHAR_SET)
     } else {
       storePath.getSchemaPath
     }
@@ -100,7 +103,7 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
   override def toFSPath: FsPath = storePath
 
   override def addMetaDataAndRecordString(content: String): Unit = {
-    if(!moveToWriteRow){
+    if (!moveToWriteRow) {
       val bytes = content.getBytes(Dolphin.CHAR_SET)
       writeLine(bytes)
     }
@@ -111,7 +114,7 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
 
   @scala.throws[IOException]
   override def addMetaData(metaData: MetaData): Unit = {
-    if(!moveToWriteRow) {
+    if (!moveToWriteRow) {
       {
         rMetaData = metaData
         init()
@@ -127,20 +130,20 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
 
   @scala.throws[IOException]
   override def addRecord(record: Record): Unit = {
-    if(moveToWriteRow) {
+    if (moveToWriteRow) {
       rowCount = rowCount + 1
       writeLine(serializer.recordToBytes(record))
     }
   }
 
   def closeFs: Unit = {
-    if(fs != null)
+    if (fs != null)
       fs.close()
   }
   override def close(): Unit = {
-    Utils.tryFinally(if(outputStream != null ) flush()){
+    Utils.tryFinally(if (outputStream != null ) flush()){
       closeFs
-      if(outputStream != null){
+      if (outputStream != null) {
       outputStream.close()
     }}
   }
@@ -148,7 +151,7 @@ class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,
   override def flush(): Unit = {
     createNewFile
     if(outputStream != null) {
-      if(buffer.nonEmpty) {
+      if (buffer.nonEmpty) {
         outputStream.write(buffer.toArray)
         buffer.clear()
       }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org