You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ivoson (via GitHub)" <gi...@apache.org> on 2023/03/30 15:53:09 UTC

[GitHub] [spark] ivoson opened a new pull request, #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

ivoson opened a new pull request, #40610:
URL: https://github.com/apache/spark/pull/40610

   ### What changes were proposed in this pull request?
   Add a destructive iterator to SparkResult and change `Dataset.toLocalIterator` to use the desctructive iterator.
   With the desctructive iterator, we will:
   1. Close the `ColumarBatch` once its data got consumed;
   2. Remove the `ColumarBatch` from `SparkResult.batches`;
   
   ### Why are the changes needed?
   Instead of keeping everything in memory for the life time of SparkResult object, clean it up as soon as we know we are done with it. 
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   UT added.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1168175875


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -46,7 +46,13 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[this] var nextBatchIndex: Int = 0
+  private[this] val idxToBatches = mutable.Map.empty[Int, ColumnarBatch]
+
+  // Exposed only for UT.
+  private[sql] def existingBatches(): Seq[ColumnarBatch] = {
+    idxToBatches.values.toSeq

Review Comment:
   On the other hand, I prefer to use `PrivateMethodTester` instead of exposed a new function
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1567045509

   > @ivoson Can you resolve the conflict?
   
   Thanks @LuciferYang , merged master branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1171415246


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -46,7 +46,13 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[this] var nextBatchIndex: Int = 0
+  private[this] val idxToBatches = mutable.Map.empty[Int, ColumnarBatch]
+
+  // Exposed only for UT.
+  private[sql] def existingBatches(): Seq[ColumnarBatch] = {
+    idxToBatches.values.toSeq

Review Comment:
   Thanks, sorted the results by key. And `PrivateMethodTester` works for private method.
   Do you think shall we mark the new funciton as `private` and use `PrivateMethodTester` to invoke it? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1491250312

   ```
   2023-03-30T16:09:39.9363333Z [info] - Dataset result destructive iterator *** FAILED *** (84 milliseconds)
   2023-03-30T16:09:39.9382605Z [info]   org.apache.spark.sql.vectorized.ColumnarBatch@3f5ed920 equaled org.apache.spark.sql.vectorized.ColumnarBatch@3f5ed920 (ClientE2ETestSuite.scala:819)
   2023-03-30T16:09:39.9383550Z [info]   org.scalatest.exceptions.TestFailedException:
   2023-03-30T16:09:39.9384640Z [info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
   2023-03-30T16:09:39.9385936Z [info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
   2023-03-30T16:09:39.9387002Z [info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
   2023-03-30T16:09:39.9388072Z [info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
   2023-03-30T16:09:39.9389136Z [info]   at org.apache.spark.sql.ClientE2ETestSuite.$anonfun$new$106(ClientE2ETestSuite.scala:819)
   2023-03-30T16:09:39.9390070Z [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   2023-03-30T16:09:39.9391014Z [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   2023-03-30T16:09:39.9392246Z [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   2023-03-30T16:09:39.9393644Z [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   2023-03-30T16:09:39.9394518Z [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   2023-03-30T16:09:39.9395634Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   2023-03-30T16:09:39.9396587Z [info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
   2023-03-30T16:09:39.9397490Z [info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
   2023-03-30T16:09:39.9398540Z [info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
   2023-03-30T16:09:39.9399811Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   2023-03-30T16:09:39.9400900Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   2023-03-30T16:09:39.9401857Z [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   2023-03-30T16:09:39.9402904Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   2023-03-30T16:09:39.9403910Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   2023-03-30T16:09:39.9405036Z [info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
   2023-03-30T16:09:39.9406066Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   2023-03-30T16:09:39.9407048Z [info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   2023-03-30T16:09:39.9407986Z [info]   at scala.collection.immutable.List.foreach(List.scala:431)
   2023-03-30T16:09:39.9409109Z [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   2023-03-30T16:09:39.9410244Z [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   2023-03-30T16:09:39.9411594Z [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   2023-03-30T16:09:39.9412839Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   2023-03-30T16:09:39.9414688Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   2023-03-30T16:09:39.9416225Z [info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   2023-03-30T16:09:39.9416768Z [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   2023-03-30T16:09:39.9417907Z [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   2023-03-30T16:09:39.9419535Z [info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   2023-03-30T16:09:39.9420225Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   2023-03-30T16:09:39.9421710Z [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   2023-03-30T16:09:39.9422826Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   2023-03-30T16:09:39.9423713Z [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   2023-03-30T16:09:39.9436163Z [info]   at org.apache.spark.sql.ClientE2ETestSuite.org$scalatest$BeforeAndAfterAll$$super$run(ClientE2ETestSuite.scala:41)
   2023-03-30T16:09:39.9437232Z [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   2023-03-30T16:09:39.9455327Z [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   2023-03-30T16:09:39.9456678Z [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   2023-03-30T16:09:39.9457909Z [info]   at org.apache.spark.sql.ClientE2ETestSuite.run(ClientE2ETestSuite.scala:41)
   2023-03-30T16:09:39.9459138Z [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   2023-03-30T16:09:39.9460291Z [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   2023-03-30T16:09:39.9461481Z [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
   2023-03-30T16:09:39.9462543Z [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   2023-03-30T16:09:39.9463721Z [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   2023-03-30T16:09:39.9464883Z [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   2023-03-30T16:09:39.9465861Z [info]   at java.lang.Thread.run(Thread.java:750)
   ```
   @ivoson The new test failed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1218169052


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -30,21 +31,33 @@ import org.apache.commons.io.FileUtils
 import org.apache.commons.io.output.TeeOutputStream
 import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.scalactic.TolerantNumerics
+import org.scalatest.PrivateMethodTester
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.{SPARK_VERSION, SparkException}
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.connect.client.SparkConnectClient
+import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
 import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
 import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils.port
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.ThreadUtils
 
-class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
+class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateMethodTester {
+  // Helper methods for accessing private methods/fields
+  private val _idxToBatches =

Review Comment:
   Make sense. Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1558517257

   @ivoson Can you resolve the conflict?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1167475549


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -134,24 +134,41 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable = {
+  def iterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = false)
+
+  /**
+   * Returns an destructive iterator over the contents of the result.
+   */
+  def destructiveIterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = true)
+
+  private def buildIterator(destructive: Boolean): java.util.Iterator[T] with AutoCloseable = {
     new java.util.Iterator[T] with AutoCloseable {
-      private[this] var batchIndex: Int = -1
       private[this] var iterator: java.util.Iterator[InternalRow] = Collections.emptyIterator()
       private[this] var deserializer: Deserializer[T] = _
+      private[this] var currentBatch: ColumnarBatch = _
+      private[this] val _destructive: Boolean = destructive

Review Comment:
   Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1510302202

   looking into the test failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1168134446


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -46,7 +46,13 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[this] var nextBatchIndex: Int = 0
+  private[this] val idxToBatches = mutable.Map.empty[Int, ColumnarBatch]
+
+  // Exposed only for UT.
+  private[sql] def existingBatches(): Seq[ColumnarBatch] = {
+    idxToBatches.values.toSeq

Review Comment:
   The order of `idxToBatches.values.toSeq` may be different from `idxToBatches.put`, and this is related to the Scala version. 
   
   For example
   ```
   val idxToBatches = scala.collection.mutable.Map.empty[Int, Long]
   idxToBatches.put(0, 1)
   idxToBatches.put(1, 2)
   idxToBatches.put(2, 3)
   idxToBatches.put(3, 4)
   idxToBatches.put(4, 5)
   idxToBatches.put(5, 6)
   ```
   
   the result of `idxToBatches.values.toSeq` is `3, 6, 5, 2, 4, 1` with Scala 2.12 and `1, 
    2, 3, 4, 5,6 ` with Scala 2.13. I think a sorting should be added to make it a stable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1218561769


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -142,24 +144,39 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable = {
+  def iterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = false)

Review Comment:
   Why we still keep `iterator`? Because it is already an API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1168174768


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -70,22 +76,24 @@ private[sql] class SparkResult[T](
         val reader = new ArrowStreamReader(ipcStreamBytes.newInput(), allocator)
         try {
           val root = reader.getVectorSchemaRoot
-          if (batches.isEmpty) {
-            if (structType == null) {
-              // If the schema is not available yet, fallback to the schema from Arrow.
-              structType = ArrowUtils.fromArrowSchema(root.getSchema)
-            }
-            // TODO: create encoders that directly operate on arrow vectors.
+          if (structType == null) {
+            // If the schema is not available yet, fallback to the schema from Arrow.
+            structType = ArrowUtils.fromArrowSchema(root.getSchema)
+          }
+          // TODO: create encoders that directly operate on arrow vectors.
+          if (boundEncoder == null) {
             boundEncoder = createEncoder(structType).resolveAndBind(structType.toAttributes)
           }
+

Review Comment:
   please revert this line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1153957184


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -45,7 +45,7 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[sql] val batches = mutable.Buffer.empty[ColumnarBatch]

Review Comment:
   Please expose the metric you need for testing instead of making mutable state more visible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1558516547

   friendly ping @hvanhovell , does this one ok?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1218561769


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -142,24 +144,39 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable = {
+  def iterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = false)

Review Comment:
   Just for my self education to understand the intention of current implementation: Is this `iterator` still useful somewhere so we still keep it? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1153980266


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -134,24 +134,41 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable = {
+  def iterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = false)
+
+  /**
+   * Returns an destructive iterator over the contents of the result.
+   */
+  def destructiveIterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = true)
+
+  private def buildIterator(destructive: Boolean): java.util.Iterator[T] with AutoCloseable = {
     new java.util.Iterator[T] with AutoCloseable {
-      private[this] var batchIndex: Int = -1
       private[this] var iterator: java.util.Iterator[InternalRow] = Collections.emptyIterator()
       private[this] var deserializer: Deserializer[T] = _
+      private[this] var currentBatch: ColumnarBatch = _
+      private[this] val _destructive: Boolean = destructive
+
       override def hasNext: Boolean = {
         if (iterator.hasNext) {
           return true
         }
-        val nextBatchIndex = batchIndex + 1
+        val batchIndex = batches.indexOf(currentBatch)

Review Comment:
    I have been looking at this a for a bit now. I am not sure if I like it. There are two issues:
   - In destructive mode you know the location of the current batch. It should be at index = 0. In non destructive mode the index should be `batchIndex`. We are not doing anything with that information.
   - The removal can be pretty expensive since we are removing from the head.
   
   I am wondering if we can use a better suited data structure here. You could use a map, since that will give you cheap removals, and fairly fast lookups. Alternatively we could implement something a-kin to a linkedlist (I don't think you can use a stock linked list since those don't like updates during iteration).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1154005920


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -134,24 +134,41 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable = {
+  def iterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = false)
+
+  /**
+   * Returns an destructive iterator over the contents of the result.
+   */
+  def destructiveIterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = true)
+
+  private def buildIterator(destructive: Boolean): java.util.Iterator[T] with AutoCloseable = {
     new java.util.Iterator[T] with AutoCloseable {
-      private[this] var batchIndex: Int = -1
       private[this] var iterator: java.util.Iterator[InternalRow] = Collections.emptyIterator()
       private[this] var deserializer: Deserializer[T] = _
+      private[this] var currentBatch: ColumnarBatch = _
+      private[this] val _destructive: Boolean = destructive
+
       override def hasNext: Boolean = {
         if (iterator.hasNext) {
           return true
         }
-        val nextBatchIndex = batchIndex + 1
+        val batchIndex = batches.indexOf(currentBatch)

Review Comment:
   Thanks @hvanhovell , will try to make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1510850421

   It is not related to the current PR. It seems that the `SparkResult ` instance is not thread-safe, do we need to consider this? @hvanhovell 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1577786719

   Thanks for review. @LuciferYang @hvanhovell @juliuszsompolski 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1217782230


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -30,21 +31,33 @@ import org.apache.commons.io.FileUtils
 import org.apache.commons.io.output.TeeOutputStream
 import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.scalactic.TolerantNumerics
+import org.scalatest.PrivateMethodTester
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.{SPARK_VERSION, SparkException}
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.connect.client.SparkConnectClient
+import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
 import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
 import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils.port
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.ThreadUtils
 
-class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
+class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateMethodTester {
+  // Helper methods for accessing private methods/fields
+  private val _idxToBatches =

Review Comment:
   I suggest moving `_idxToBatches` and `getColumnarBatches` inside `Dataset result destructive iterator`  because it is only used by `Dataset result destructive iterator`. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1507952082

   @ivoson any update of this one ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1577761092

   Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1167475609


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -45,7 +45,7 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[sql] val batches = mutable.Buffer.empty[ColumnarBatch]

Review Comment:
   Sure, changed back to private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1168218532


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -142,24 +150,41 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable = {
+  def iterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = false)
+
+  /**
+   * Returns an destructive iterator over the contents of the result.
+   */
+  def destructiveIterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = true)
+
+  private def buildIterator(destructive: Boolean): java.util.Iterator[T] with AutoCloseable = {
     new java.util.Iterator[T] with AutoCloseable {
       private[this] var batchIndex: Int = -1
       private[this] var iterator: java.util.Iterator[InternalRow] = Collections.emptyIterator()
       private[this] var deserializer: Deserializer[T] = _
+
       override def hasNext: Boolean = {
         if (iterator.hasNext) {
           return true
         }
+
         val nextBatchIndex = batchIndex + 1
-        val hasNextBatch = if (nextBatchIndex == batches.size) {
+        if (destructive && idxToBatches.contains(batchIndex)) {
+          val currentBatch = idxToBatches(batchIndex)

Review Comment:
   if `currentBatch` just used to close `ColumnarBatch`, how about
   ```
           if (destructive) {
             idxToBatches.remove(batchIndex).foreach(_.close())
           }
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1520993028

   Hi @hvanhovell , could you please take a look at this PR? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1213950001


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -46,7 +46,14 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[this] var nextBatchIndex: Int = 0
+  private[this] val idxToBatches = mutable.Map.empty[Int, ColumnarBatch]
+
+  // Exposed for UT.
+  private[sql] def existingBatches(): Seq[ColumnarBatch] = {

Review Comment:
   Maybe we should use `PrivateMethodTester` to get `idxToBatches` in test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1168234098


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -863,6 +865,38 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
     }.getMessage
     assert(message.contains("PARSE_SYNTAX_ERROR"))
   }
+
+  test("Dataset result destructive iterator") {
+    val df = spark.range(0, 10, 1, 10)
+      .filter("id > 5 and id < 9")
+    val res = df.collectResult()
+
+    try {

Review Comment:
   Use `withResult`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1171412580


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -142,24 +150,41 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable = {
+  def iterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = false)
+
+  /**
+   * Returns an destructive iterator over the contents of the result.
+   */
+  def destructiveIterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = true)
+
+  private def buildIterator(destructive: Boolean): java.util.Iterator[T] with AutoCloseable = {
     new java.util.Iterator[T] with AutoCloseable {
       private[this] var batchIndex: Int = -1
       private[this] var iterator: java.util.Iterator[InternalRow] = Collections.emptyIterator()
       private[this] var deserializer: Deserializer[T] = _
+
       override def hasNext: Boolean = {
         if (iterator.hasNext) {
           return true
         }
+
         val nextBatchIndex = batchIndex + 1
-        val hasNextBatch = if (nextBatchIndex == batches.size) {
+        if (destructive && idxToBatches.contains(batchIndex)) {
+          val currentBatch = idxToBatches(batchIndex)

Review Comment:
   Yes, that looks better. Thanks.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -70,22 +76,24 @@ private[sql] class SparkResult[T](
         val reader = new ArrowStreamReader(ipcStreamBytes.newInput(), allocator)
         try {
           val root = reader.getVectorSchemaRoot
-          if (batches.isEmpty) {
-            if (structType == null) {
-              // If the schema is not available yet, fallback to the schema from Arrow.
-              structType = ArrowUtils.fromArrowSchema(root.getSchema)
-            }
-            // TODO: create encoders that directly operate on arrow vectors.
+          if (structType == null) {
+            // If the schema is not available yet, fallback to the schema from Arrow.
+            structType = ArrowUtils.fromArrowSchema(root.getSchema)
+          }
+          // TODO: create encoders that directly operate on arrow vectors.
+          if (boundEncoder == null) {
             boundEncoder = createEncoder(structType).resolveAndBind(structType.toAttributes)
           }
+

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1168218532


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -142,24 +150,41 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable = {
+  def iterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = false)
+
+  /**
+   * Returns an destructive iterator over the contents of the result.
+   */
+  def destructiveIterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = true)
+
+  private def buildIterator(destructive: Boolean): java.util.Iterator[T] with AutoCloseable = {
     new java.util.Iterator[T] with AutoCloseable {
       private[this] var batchIndex: Int = -1
       private[this] var iterator: java.util.Iterator[InternalRow] = Collections.emptyIterator()
       private[this] var deserializer: Deserializer[T] = _
+
       override def hasNext: Boolean = {
         if (iterator.hasNext) {
           return true
         }
+
         val nextBatchIndex = batchIndex + 1
-        val hasNextBatch = if (nextBatchIndex == batches.size) {
+        if (destructive && idxToBatches.contains(batchIndex)) {
+          val currentBatch = idxToBatches(batchIndex)

Review Comment:
   if `currentBatch` just used to close `ColumnarBatch`, how about
   ```scala
   if (destructive) {
       idxToBatches.remove(batchIndex).foreach(_.close())
   }
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult
URL: https://github.com/apache/spark/pull/40610


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1213950001


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -46,7 +46,14 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[this] var nextBatchIndex: Int = 0
+  private[this] val idxToBatches = mutable.Map.empty[Int, ColumnarBatch]
+
+  // Exposed for UT.
+  private[sql] def existingBatches(): Seq[ColumnarBatch] = {

Review Comment:
   nit: maybe we should use `PrivateMethodTester` to get `idxToBatches` in test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1216559940


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -46,7 +46,14 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[this] var nextBatchIndex: Int = 0
+  private[this] val idxToBatches = mutable.Map.empty[Int, ColumnarBatch]
+
+  // Exposed for UT.
+  private[sql] def existingBatches(): Seq[ColumnarBatch] = {

Review Comment:
   thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on PR #40610:
URL: https://github.com/apache/spark/pull/40610#issuecomment-1509728135

   Latest commits addressed the comments above. cc @hvanhovell @LuciferYang please take a look when you have time. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "ivoson (via GitHub)" <gi...@apache.org>.
ivoson commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1171412097


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -863,6 +865,38 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
     }.getMessage
     assert(message.contains("PARSE_SYNTAX_ERROR"))
   }
+
+  test("Dataset result destructive iterator") {
+    val df = spark.range(0, 10, 1, 10)
+      .filter("id > 5 and id < 9")
+    val res = df.collectResult()
+
+    try {

Review Comment:
   Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1168134446


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -46,7 +46,13 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[this] var nextBatchIndex: Int = 0
+  private[this] val idxToBatches = mutable.Map.empty[Int, ColumnarBatch]
+
+  // Exposed only for UT.
+  private[sql] def existingBatches(): Seq[ColumnarBatch] = {
+    idxToBatches.values.toSeq

Review Comment:
   The order of `idxToBatches.values.toSeq` may be different from `idxToBatches.put`, and this is related to the Scala version. 
   
   For example
   ```
   val idxToBatches = scala.collection.mutable.Map.empty[Int, Long]
   idxToBatches.put(0, 1)
   idxToBatches.put(1, 2)
   idxToBatches.put(2, 3)
   idxToBatches.put(3, 4)
   idxToBatches.put(4, 5)
   idxToBatches.put(5, 6)
   ```
   
   the result of `idxToBatches.values.toSeq` is `3, 6, 5, 2, 4, 1` with Scala 2.12 and `1, 
    2, 3, 4, 5, 6 ` with Scala 2.13. I think a sorting should be added to make it a stable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1153957374


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##########
@@ -134,24 +134,41 @@ private[sql] class SparkResult[T](
   /**
    * Returns an iterator over the contents of the result.
    */
-  def iterator: java.util.Iterator[T] with AutoCloseable = {
+  def iterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = false)
+
+  /**
+   * Returns an destructive iterator over the contents of the result.
+   */
+  def destructiveIterator: java.util.Iterator[T] with AutoCloseable =
+    buildIterator(destructive = true)
+
+  private def buildIterator(destructive: Boolean): java.util.Iterator[T] with AutoCloseable = {
     new java.util.Iterator[T] with AutoCloseable {
-      private[this] var batchIndex: Int = -1
       private[this] var iterator: java.util.Iterator[InternalRow] = Collections.emptyIterator()
       private[this] var deserializer: Deserializer[T] = _
+      private[this] var currentBatch: ColumnarBatch = _
+      private[this] val _destructive: Boolean = destructive

Review Comment:
   You can just use `destructive` parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org