You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/22 15:59:36 UTC

[GitHub] [spark] kevins-29 opened a new pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

kevins-29 opened a new pull request #35613:
URL: https://github.com/apache/spark/pull/35613


   ### What changes were proposed in this pull request?
   Wrapping the DataInputStream in the SparkPlan.decodeUnsafeRows method with a NextIterator as opposed to a plain Iterator, this will allow us to close the DataInputStream properly.
   
   ### Why are the changes needed?
   SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of `CompressionCodec.compressedInputStream` would need to manually close the stream as this would no longer be handled by the finaliser mechanism.
   
   In SparkPlan, the result of `CompressionCodec.compressedInputStream` is wrapped in an Iterator which never calls close. 
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   
   #### Spark Shell Configuration
   ```bash
   $> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch --Xms=1g"
   $> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd
   ```
   
   #### Test Script
   ```scala
   import java.sql.Timestamp
   import java.time.Instant
   import spark.implicits._
   
   case class Record(timestamp: Timestamp, batch: Long, value: Long)
   
   (1 to 300).foreach { batch =>
     sc.parallelize(1 to 100000).map(Record(Timestamp.from(Instant.now()), batch, _)).toDS.write.parquet(s"test_data/batch_$batch")
   }
   
   (1 to 300).foreach(batch => spark.read.parquet(s"test_data/batch_$batch").as[Record].repartition().collect())
   
   ```
   
   #### Memory Monitor
   ```shell
   $> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | grep "total kB" | awk '{print $4}'); sleep 10; done;
   ``` 
   
   #### Results
   
   ##### Before - 1st Run
   ```
   "2022-02-22 14:12:27",1449308
   "2022-02-22 14:12:37",1449396
   "2022-02-22 14:12:47",1531332
   "2022-02-22 14:12:57",1742336
   "2022-02-22 14:13:08",1796308
   "2022-02-22 14:13:18",1811164
   "2022-02-22 14:13:28",1855408
   "2022-02-22 14:13:38",1859080
   "2022-02-22 14:13:48",1901852
   "2022-02-22 14:13:58",1915132
   "2022-02-22 14:14:08",1983812
   "2022-02-22 14:14:18",1982132
   "2022-02-22 14:14:28",1996888
   "2022-02-22 14:14:38",1997440
   "2022-02-22 14:14:48",1999376
   "2022-02-22 14:14:59",2000100
   "2022-02-22 14:15:09",2213684
   "2022-02-22 14:15:19",2358572
   "2022-02-22 14:15:29",2559824
   "2022-02-22 14:15:39",2728744
   "2022-02-22 14:15:49",2900608
   "2022-02-22 14:15:59",3095464
   "2022-02-22 14:16:09",3277204
   "2022-02-22 14:16:19",3470144
   "2022-02-22 14:16:29",3628048
   "2022-02-22 14:16:40",3628048
   "2022-02-22 14:16:50",3628048
   ```
   
   ##### Before - 2nd Run
   ```
   "2022-02-22 14:27:33",1498036
   "2022-02-22 14:27:43",1581348
   "2022-02-22 14:27:53",1781136
   "2022-02-22 14:28:03",1792796
   "2022-02-22 14:28:13",1853848
   "2022-02-22 14:28:23",1872000
   "2022-02-22 14:28:33",1874008
   "2022-02-22 14:28:44",1892828
   "2022-02-22 14:28:54",1927752
   "2022-02-22 14:29:04",1950756
   "2022-02-22 14:29:14",1981436
   "2022-02-22 14:29:24",1969212
   "2022-02-22 14:29:34",1971572
   "2022-02-22 14:29:44",1972124
   "2022-02-22 14:29:54",2113104
   "2022-02-22 14:30:04",2283932
   "2022-02-22 14:30:14",2475672
   "2022-02-22 14:30:24",2670424
   "2022-02-22 14:30:35",2883556
   "2022-02-22 14:30:45",3094044
   "2022-02-22 14:30:55",3333840
   "2022-02-22 14:31:05",3570224
   "2022-02-22 14:31:15",3618060
   "2022-02-22 14:31:25",3618060
   "2022-02-22 14:31:35",3618060
   "2022-02-22 14:31:46",3617892
   "2022-02-22 14:31:56",3617220
   "2022-02-22 14:32:06",3593240
   "2022-02-22 14:32:16",3583920
   "2022-02-22 14:32:26",3583920
   ```
   
   ##### After - 1st Run
   ```
   "2022-02-22 14:22:17",1483220
   "2022-02-22 14:22:27",1606236
   "2022-02-22 14:22:37",1757304
   "2022-02-22 14:22:47",1809436
   "2022-02-22 14:22:57",1863628
   "2022-02-22 14:23:07",1860172
   "2022-02-22 14:23:17",1858216
   "2022-02-22 14:23:27",1898056
   "2022-02-22 14:23:37",1936076
   "2022-02-22 14:23:48",1937776
   "2022-02-22 14:23:58",1926536
   "2022-02-22 14:24:08",1936188
   "2022-02-22 14:24:18",2048996
   "2022-02-22 14:24:28",2093032
   "2022-02-22 14:24:38",2118164
   "2022-02-22 14:24:48",2145980
   "2022-02-22 14:24:58",2133464
   "2022-02-22 14:25:08",2134964
   "2022-02-22 14:25:18",2132620
   "2022-02-22 14:25:28",2132620
   ```
   
   ##### After - 2nd Run
   ```
   "2022-02-22 14:33:44",1463284
   "2022-02-22 14:33:54",1558932
   "2022-02-22 14:34:04",1749700
   "2022-02-22 14:34:14",1785908
   "2022-02-22 14:34:24",1829740
   "2022-02-22 14:34:34",1891472
   "2022-02-22 14:34:44",1882452
   "2022-02-22 14:34:54",1880788
   "2022-02-22 14:35:04",1893988
   "2022-02-22 14:35:14",1968108
   "2022-02-22 14:35:24",1974868
   "2022-02-22 14:35:34",1977516
   "2022-02-22 14:35:45",1964580
   "2022-02-22 14:35:55",1966580
   "2022-02-22 14:36:05",2057708
   "2022-02-22 14:36:15",2116624
   "2022-02-22 14:36:25",2126248
   "2022-02-22 14:36:35",2123440
   "2022-02-22 14:36:45",2123724
   "2022-02-22 14:36:55",2123540
   "2022-02-22 14:37:05",2122028
   "2022-02-22 14:37:15",2122028
   "2022-02-22 14:37:25",2122028
   "2022-02-22 14:37:35",2122028
   "2022-02-22 14:37:46",2118072
   "2022-02-22 14:37:56",2096524
   "2022-02-22 14:38:06",2096524
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun closed pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun closed pull request #35613:
URL: https://github.com/apache/spark/pull/35613


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1048096616


   With Java 11, the scale is similar.
   ```
   $ java -version
   openjdk version "11.0.10" 2021-01-19
   OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
   OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)
   
   $ while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x 1427761 | grep "total kB" | awk '{print $4}'); sleep 10; done;
   "2022-02-22 10:32:10",848284
   "2022-02-22 10:32:20",901100
   "2022-02-22 10:32:30",1005732
   "2022-02-22 10:32:40",1039640
   "2022-02-22 10:32:50",1079424
   "2022-02-22 10:33:00",1084296
   "2022-02-22 10:33:10",1084360
   "2022-02-22 10:33:20",1590012
   "2022-02-22 10:33:30",1659336
   "2022-02-22 10:33:40",1819064
   "2022-02-22 10:33:50",1899304
   "2022-02-22 10:34:00",1985772
   "2022-02-22 10:34:10",2034652
   "2022-02-22 10:34:20",2037392
   ```
   
   To be clear, this patch shows the improvement (2037392 -> 1739112), of course.
   ```
   $ while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x 1404956 | grep "total kB" | awk '{print $4}'); sleep 10; done;
   "2022-02-22 10:21:42",823772
   "2022-02-22 10:21:52",844600
   "2022-02-22 10:22:02",986640
   "2022-02-22 10:22:12",1019404
   "2022-02-22 10:22:22",1052240
   "2022-02-22 10:22:32",1061876
   "2022-02-22 10:22:42",1063884
   "2022-02-22 10:22:52",1584192
   "2022-02-22 10:23:02",1625884
   "2022-02-22 10:23:12",1681960
   "2022-02-22 10:23:22",1699980
   "2022-02-22 10:23:32",1740428
   "2022-02-22 10:23:43",1738484
   "2022-02-22 10:23:53",1739112
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812355588



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {

Review comment:
       Could we add `finished` into the check, i.e. `if (!finished && sizeOfNextRow >= 0)`? 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1049525229


   I did a final run with all of the above changes:
   
   #### Results
   ```
   "2022-02-23 21:42:49",1380280
   "2022-02-23 21:42:59",1497724
   "2022-02-23 21:43:09",1577204
   "2022-02-23 21:43:19",1607924
   "2022-02-23 21:43:29",1629520
   "2022-02-23 21:43:39",1635496
   "2022-02-23 21:43:49",1637988
   "2022-02-23 21:43:59",1690768
   "2022-02-23 21:44:09",1681452
   "2022-02-23 21:44:19",1685392
   "2022-02-23 21:44:29",1808616
   "2022-02-23 21:44:39",1812516
   "2022-02-23 21:44:49",1821692
   "2022-02-23 21:44:59",1824572
   "2022-02-23 21:45:09",1827732
   "2022-02-23 21:45:19",1829752
   "2022-02-23 21:45:29",1829344
   "2022-02-23 21:45:39",1833076
   "2022-02-23 21:45:49",1833788
   "2022-02-23 21:45:59",1835164
   "2022-02-23 21:46:09",1836276
   "2022-02-23 21:46:19",1837496
   "2022-02-23 21:46:29",1837904
   "2022-02-23 21:46:39",1845368
   "2022-02-23 21:46:49",1838824
   "2022-02-23 21:47:00",1838992
   "2022-02-23 21:47:10",1839456
   "2022-02-23 21:47:20",1839712
   "2022-02-23 21:47:30",1839856
   "2022-02-23 21:47:40",1865444
   "2022-02-23 21:47:50",1874092
   "2022-02-23 21:48:00",1875056
   "2022-02-23 21:48:10",1875496
   "2022-02-23 21:48:20",1876280
   "2022-02-23 21:48:30",1876680
   "2022-02-23 21:48:40",1879324
   "2022-02-23 21:48:50",1879492
   "2022-02-23 21:49:00",1879360
   "2022-02-23 21:49:10",1879296
   "2022-02-23 21:49:20",1878556
   "2022-02-23 21:49:30",1877264
   "2022-02-23 21:49:40",1872088
   "2022-02-23 21:49:50",1859340
   "2022-02-23 21:50:00",1859340
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r813582646



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
+      private def _next(): InternalRow = {
         val bs = new Array[Byte](sizeOfNextRow)
         ins.readFully(bs)
         val row = new UnsafeRow(nFields)
         row.pointTo(bs, sizeOfNextRow)
         sizeOfNextRow = ins.readInt()
         row
       }

Review comment:
       ```suggestion
         }
    
   ```
   
   (per https://github.com/databricks/scala-style-guide#blank-lines-vertical-whitespace)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812522469



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))

Review comment:
       Hm, do we have the case when `TaskContext` is set? Thought `decodeUnsafeRows` is always invoked in driver side to decode rows.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812637273



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()

Review comment:
       But I would agree that `CompletionIterator` provides a neater solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1048066739


   Hi @dongjoon-hyun,
   
   I ran the above tests on a build from master. The test was run using AppleJDK-11.0.5.10.1.
   
   Did run export `SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch --Xms=1g"` before starting spark-shell? The reason for this is that we want to "lock" in the amount of RSS that is taken up by the heap at the start of the process, then any further growth can be attributed to native libraries.
   
   Also the assumption was the default `--Xmx=1g` would be used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812319583



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,34 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      private val taskContext: TaskContext = TaskContext.get()
+      if (taskContext != null) {
+        taskContext.addTaskCompletionListener[Unit] { _ => closeIfNeeded() }
+      }

Review comment:
       BTW, shall we declare this before `new NextIterator[InternalRow] {` instead of here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r813480379



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,28 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {

Review comment:
       Or turn the existing func `override def next(): InternalRow = {` to a private function, and call it in `getNext`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812633201



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,28 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {
+          try {
+            val bs = new Array[Byte](sizeOfNextRow)
+            ins.readFully(bs)
+            val row = new UnsafeRow(nFields)
+            row.pointTo(bs, sizeOfNextRow)
+            sizeOfNextRow = ins.readInt()
+            row
+          } catch {
+            case t: Throwable if ins != null =>
+              ins.close()
+              throw t
+          }
+        } else {
+          finished = true
+          null
+        }

Review comment:
       `NextIterator` does that for us
   ```
     override def hasNext: Boolean = {
       if (!finished) {
         if (!gotNext) {
           nextValue = getNext()
           if (finished) {
             closeIfNeeded()
           }
           gotNext = true
         }
       }
       !finished
     }
   ``` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1048174084


   With regards to the discrepancy when re-testing. I made a mistake when documenting the script, I originally used 1,000,000 records per a batch, not 100,000. I have updated the script accordingly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1048072463


   Yes, I followed the instruction on Ubuntu/Java17 without any other options. Let me try it again with Java 11.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812520465



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {
+          try {
+            val bs = new Array[Byte](sizeOfNextRow)
+            ins.readFully(bs)
+            val row = new UnsafeRow(nFields)
+            row.pointTo(bs, sizeOfNextRow)
+            sizeOfNextRow = ins.readInt()
+            row
+          } catch {
+            case e: Exception =>

Review comment:
       Maybe `Throwable` would be better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812532597



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)

Review comment:
       We can't close it here since we'll need to read multiple rows from the input stream, one for each `next` call, and only close when done processing the iterator.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))

Review comment:
       Yea we can't use `TaskContext` here: seems we need to find some other way to close the input stream.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {
+          try {
+            val bs = new Array[Byte](sizeOfNextRow)
+            ins.readFully(bs)
+            val row = new UnsafeRow(nFields)
+            row.pointTo(bs, sizeOfNextRow)
+            sizeOfNextRow = ins.readInt()
+            row
+          } catch {
+            case e: Exception =>

Review comment:
       +1. It's safer to catch on `Throwable`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1048468355


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1049531332


   Thank you for the final update. Please put it into the PR description, @kevins-29 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812366848



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()

Review comment:
       nit: should we also protect this with `try .. catch` and close the input when error happens?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812419063



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()

Review comment:
       Yea seems you are right. `decodeUnsafeRows` is only called on collected result from driver. Hmm in this case I wonder if we should find other ways to make sure the input stream is closed at the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812523493



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)

Review comment:
       I guess i'm missing sth .. but we can't we just call `close` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812633201



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,28 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {
+          try {
+            val bs = new Array[Byte](sizeOfNextRow)
+            ins.readFully(bs)
+            val row = new UnsafeRow(nFields)
+            row.pointTo(bs, sizeOfNextRow)
+            sizeOfNextRow = ins.readInt()
+            row
+          } catch {
+            case t: Throwable if ins != null =>
+              ins.close()
+              throw t
+          }
+        } else {
+          finished = true
+          null
+        }

Review comment:
       NextIterator does that for us
   ```
     override def hasNext: Boolean = {
       if (!finished) {
         if (!gotNext) {
           nextValue = getNext()
           if (finished) {
             closeIfNeeded()
           }
           gotNext = true
         }
       }
       !finished
     }
   ``` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1048175525


   Thank you for the investigation and update!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812365874



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {

Review comment:
       I think it would be redundant as `getNext()` is only called from `hasNext` and only if `!finished`
   ```
     override def hasNext: Boolean = {
       if (!finished) {
         if (!gotNext) {
           nextValue = getNext()
           if (finished) {
             closeIfNeeded()
           }
           gotNext = true
         }
       }
       !finished
     }
     ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812541101



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)

Review comment:
       ohh oops.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1049483290


   +CC @zhouyejoe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812355588



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {

Review comment:
       Could we add `finished into the check, i.e. `if (!finished && sizeOfNextRow >= 0)`? 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812421799



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {

Review comment:
       okay




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812636262



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()

Review comment:
       Both `NextIterator` and `CompletionIterator` suffer from the same limitation, ie. if the consumer of the Iterator does not consume the Iterator in its entirety, either because of an exception when processing an item or when using a method like `Iterator.find` for example, then neither will call `close`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r813479549



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,28 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {

Review comment:
       Sorry last trivial comment. Can we define this func as:
   
   ```scala
   override def getNext(): InternalRow = if (sizeOfNextRow >= 0) {
     ...
   }
   ```
   
   ? I think that will reduce the diff.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812318264



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,34 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      private val taskContext: TaskContext = TaskContext.get()
+      if (taskContext != null) {
+        taskContext.addTaskCompletionListener[Unit] { _ => closeIfNeeded() }
+      }

Review comment:
       Shall we simply like the following?
   ```scala
   - private val taskContext: TaskContext = TaskContext.get()
   - if (taskContext != null) {
   -   taskContext.addTaskCompletionListener[Unit] { _ => closeIfNeeded() }
   - }
   + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812319583



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,34 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      private val taskContext: TaskContext = TaskContext.get()
+      if (taskContext != null) {
+        taskContext.addTaskCompletionListener[Unit] { _ => closeIfNeeded() }
+      }

Review comment:
       In addition, shall we declare this before `new NextIterator[InternalRow] {` instead of here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812369858



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()

Review comment:
       I'm wondering if `addTaskCompletionListener` is actually necessary. Isn't it the case that `TaskContext` is only available on executor threads?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812622524



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,28 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {
+          try {
+            val bs = new Array[Byte](sizeOfNextRow)
+            ins.readFully(bs)
+            val row = new UnsafeRow(nFields)
+            row.pointTo(bs, sizeOfNextRow)
+            sizeOfNextRow = ins.readInt()
+            row
+          } catch {
+            case t: Throwable if ins != null =>
+              ins.close()
+              throw t
+          }
+        } else {
+          finished = true
+          null
+        }

Review comment:
       Can we call `close` when we find it is finished?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812355206



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {
+          try {
+            val bs = new Array[Byte](sizeOfNextRow)
+            ins.readFully(bs)
+            val row = new UnsafeRow(nFields)
+            row.pointTo(bs, sizeOfNextRow)
+            sizeOfNextRow = ins.readInt()
+            row
+          } catch {
+            case e: Exception =>
+              if (ins != null) {
+                ins.close()
+              }

Review comment:
       also make `finished` as true in exceptional case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 edited a comment on pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
kevins-29 edited a comment on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1048066739


   Hi @dongjoon-hyun,
   
   I ran the above tests on a build from master. The test was run using AppleJDK-11.0.5.10.1.
   
   Did you export `SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch --Xms=1g"` before starting spark-shell? The reason for this is that we want to "lock" in the amount of RSS that is taken up by the heap at the start of the process, then any further growth can be attributed to native libraries.
   
   Also the assumption was the default `--Xmx=1g` would be used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812636084



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()

Review comment:
       Oh, based on https://github.com/apache/spark/pull/35613#discussion_r812633201, when `getNext` reaches the end, `close` will be called. I think we may not need to set another way to call it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1049411116


   Looks fine to me 2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1049536976


   I am done with reviewing, looks pretty good


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1050022795


   Thank you, @kevins-29 , @viirya , @HyukjinKwon , @sunchao .
   Merged to master/branch-3.2 for Apache Spark 3.3 and 3.2.2.
   
   @kevins-29 . I added you to the Apache Spark contributor group and assigned SPARK-38273 to you.
   Welcome to the Apache Spark community.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1048005900


   cc @viirya , @sunchao , @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on a change in pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812327795



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,34 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      private val taskContext: TaskContext = TaskContext.get()
+      if (taskContext != null) {
+        taskContext.addTaskCompletionListener[Unit] { _ => closeIfNeeded() }
+      }

Review comment:
       The `closeIfNeeded()` method is a member of NextIterator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812520628



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {
+          try {
+            val bs = new Array[Byte](sizeOfNextRow)
+            ins.readFully(bs)
+            val row = new UnsafeRow(nFields)
+            row.pointTo(bs, sizeOfNextRow)
+            sizeOfNextRow = ins.readInt()
+            row
+          } catch {
+            case e: Exception =>

Review comment:
       Or `case t: Throwable if ins != null =>`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812546706



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()

Review comment:
       I think one problem is that `Dataset.toLocalIterator` indirectly uses `decodeUnsafeRows`:
   ```scala
   Dataset:
     def toLocalIterator(): java.util.Iterator[T] = {
       withAction("toLocalIterator", queryExecution) { plan =>
         val fromRow = resolvedEnc.createDeserializer()
         plan.executeToIterator().map(fromRow).asJava
       }
     }
   
   SparkPlan:
     def executeToIterator(): Iterator[InternalRow] = {
       getByteArrayRdd().map(_._2).toLocalIterator.flatMap(decodeUnsafeRows)
     }
   ```
   since the iterator is transferred to clients after `Dataset.toLocalIterator`, there's no way for Spark to know how the iterator will be used, and whether it will be completely drained. Therefore, it seems impossible to know when we should call `close` on the input stream.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 edited a comment on pull request #35613: [SPARK-38273][SQL] Fix native memory leak in SparkPlan when spark.io.compression.codec=zstd

Posted by GitBox <gi...@apache.org>.
kevins-29 edited a comment on pull request #35613:
URL: https://github.com/apache/spark/pull/35613#issuecomment-1048066739


   Hi @dongjoon-hyun,
   
   I ran the above tests on a build from master. The test was run using OpenJDK 11.
   
   Did you export `SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch --Xms=1g"` before starting spark-shell? The reason for this is that we want to "lock" in the amount of RSS that is taken up by the heap at the start of the process, then any further growth can be attributed to native libraries.
   
   Also the assumption was the default `--Xmx=1g` would be used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812641062



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()

Review comment:
       Mmmm, it seems about the same:
   
   ```
       val iter: Iterator[InternalRow] = new Iterator[InternalRow] {
         private var sizeOfNextRow = ins.readInt()
         override def hasNext(): Boolean = sizeOfNextRow >= 0
         override def next(): InternalRow = {
           try {
             val bs = new Array[Byte](sizeOfNextRow)
             ins.readFully(bs)
             val row = new UnsafeRow(nFields)
             row.pointTo(bs, sizeOfNextRow)
             sizeOfNextRow = ins.readInt()
             row
           } catch {
             case t: Throwable if ins != null =>
               ins.close()
               throw t
           }
         }
       }
       
       new CompletionIterator(iter) {
         def completion(): Unit = ins.close()
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevins-29 commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
kevins-29 commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812365874



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {
+        if (sizeOfNextRow >= 0) {

Review comment:
       I think it would be redundant as `getNext()` is only called from `hasNext` and only if `!finished`
   ```
   override def hasNext: Boolean = {
       if (!finished) {
         if (!gotNext) {
           nextValue = getNext()
           if (finished) {
             closeIfNeeded()
           }
           gotNext = true
         }
       }
       !finished
     }
     ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r812622855



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,31 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeIfNeeded()))
       private var sizeOfNextRow = ins.readInt()

Review comment:
       Maybe we can use `CompletionIterator` to make sure `close` is called when this iterator reaches the end?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r813479549



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,28 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {

Review comment:
       Sorry last comment. Can we define this func as:
   
   ```scala
   override def getNext(): InternalRow = if (sizeOfNextRow >= 0) {
     ...
   }
   ```
   
   ? I think that will reduce the diff.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35613: [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying input streams

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35613:
URL: https://github.com/apache/spark/pull/35613#discussion_r813480630



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -384,17 +385,28 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     val bis = new ByteArrayInputStream(bytes)
     val ins = new DataInputStream(codec.compressedInputStream(bis))
 
-    new Iterator[InternalRow] {
+    new NextIterator[InternalRow] {
       private var sizeOfNextRow = ins.readInt()
-      override def hasNext: Boolean = sizeOfNextRow >= 0
-      override def next(): InternalRow = {
-        val bs = new Array[Byte](sizeOfNextRow)
-        ins.readFully(bs)
-        val row = new UnsafeRow(nFields)
-        row.pointTo(bs, sizeOfNextRow)
-        sizeOfNextRow = ins.readInt()
-        row
+      override def getNext(): InternalRow = {

Review comment:
       It's minor but I am suggesting to make the backport easier to reduce conflicts + make it easier to review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org