You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/28 13:02:18 UTC

[GitHub] [spark] eejbyfeldt opened a new pull request, #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in DeserializationStream

eejbyfeldt opened a new pull request, #38428:
URL: https://github.com/apache/spark/pull/38428

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   This PR avoid exceptions in the implementation of KryoDeserializationStream.
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   Using an exceptions for end of stream is slow, especially for small streams. It also problematic as it the exception caught in the KryoDeserializationStream could also be caused by corrupt data which would just be ignored in the current implementation.
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, it changes so some method on KryoDeserializationStream no longer raises EOFException.
   
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   Existing tests.
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   This PR only changes KryoDeserializationStream as a proof of concept. If this is the direction we want to go we should probably change DerserializationStream isntead so that the interface is consistent.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1190146651


##########
core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.serializer.KryoTest._
+
+/**
+ * Benchmark for kryo asIterator on a deserialization stream". To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> <spark core test jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoIteratorBenchmark extends BenchmarkBase {
+  val N = 10000
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val name = "Benchmark of kryo asIterator on deserialization stream"
+    runBenchmark(name) {
+      val benchmark = new Benchmark(name, N, 10, output = output)
+      Seq(true, false).map(useIterator => run(useIterator, benchmark))
+      benchmark.run()
+    }
+  }
+
+  private def run(useIterator: Boolean, benchmark: Benchmark): Unit = {
+    val ser = createSerializer()
+
+    def roundTrip[T: ClassTag](
+        elements: Array[T],
+        useIterator: Boolean,
+        ser: SerializerInstance): Int = {
+      val serialized: Array[Byte] = {
+        val baos = new ByteArrayOutputStream()

Review Comment:
   The initial size of `ByteArrayOutputStream` is 32. Will the grow of underlying `byte[]` and GC affect the test results? If so, is it possible to estimate a reasonable initial size?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
eejbyfeldt commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1378799229

   I ran the master branch again and used an executor with the same cpu.
   ```
   OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure
   Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
   Benchmark KryoPool vs old"pool of 1" implementation:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   -----------------------------------------------------------------------------------------------------------------------------------
   KryoPool:true                                                 9375          12171         NaN          0.0    18750400.9       1.0X
   KryoPool:false                                               13849          16799         NaN          0.0    27697646.0       0.7X
   ```
   
   Based on this it looks like the branch is branch might is a bit faster. But I think it might also be in noise territory and that one would need a more specific benchmark that creates a lot of small streams for the difference to show up. I think it only expected to be order of percent better in the "worst case" when we are creating lots of small streams.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1523086807

   This slipped through my radar, thanks for the ping.
   Can you update to latest master @eejbyfeldt ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1537845261

   My plan was to update the benchmarks. But I did not get around to uploading the results until today. But now the branch should be updated with an up to date run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on a diff in pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in DeserializationStream

Posted by GitBox <gi...@apache.org>.
eejbyfeldt commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1010340881


##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  final private[this] def hasNext: Boolean = {

Review Comment:
   Yes, will fix.



##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -324,6 +326,36 @@ class KryoDeserializationStream(
       }
     }
   }
+
+  final override def asIterator: Iterator[Any] = new NextIterator[Any] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        readValue[Any]()
+      } else {
+        finished = true
+        null
+      }
+    }
+
+    override protected def close(): Unit = {
+      KryoDeserializationStream.this.close()
+    }
+  }
+
+  final override def asKeyValueIterator: Iterator[(Any, Any)] = new NextIterator[(Any, Any)] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        (readKey[Any](), readValue[Any]())

Review Comment:
   You mean that if only a key exist we just ignore it like the current implementation would?



##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  final private[this] def hasNext: Boolean = {
+    if (input == null) {
+      return false
+    }
+
+    val eof = input.eof()
+    if (eof) close()
+    !eof
+  }
+
   override def readObject[T: ClassTag](): T = {
-    try {
       kryo.readClassAndObject(input).asInstanceOf[T]
-    } catch {
-      // DeserializationStream uses the EOF exception to indicate stopping condition.
-      case e: KryoException
-        if e.getMessage.toLowerCase(Locale.ROOT).contains("buffer underflow") =>
-        throw new EOFException
-    }

Review Comment:
   Sure will add it back. I think that catching and ignoring the exceptions here should be revisited in some other change as it seems to me like it could case dataloss that we just assume the exception here means EOF. 



##########
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:
##########
@@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C](
      * If no more pairs are left, return null.
      */
     private def readNextItem(): (K, C) = {
-      try {
-        val k = deserializeStream.readKey().asInstanceOf[K]
-        val c = deserializeStream.readValue().asInstanceOf[C]
-        val item = (k, c)
-        objectsRead += 1
-        if (objectsRead == serializerBatchSize) {
-          objectsRead = 0
-          deserializeStream = nextBatchStream()
-        }
-        item
-      } catch {
-        case e: EOFException =>
-          cleanup()
-          null
+      val next = batchIterator.next()
+      objectsRead += 1
+      if (objectsRead == serializerBatchSize) {
+        objectsRead = 0
+        batchIterator = nextBatchIterator()
       }
+      next
     }
 
     override def hasNext: Boolean = {
-      if (nextItem == null) {
-        if (deserializeStream == null) {
-          // In case of deserializeStream has not been initialized
-          deserializeStream = nextBatchStream()
-          if (deserializeStream == null) {
-            return false
-          }
+      if (batchIterator == null) {
+        // In case of batchIterator has not been initialized
+        batchIterator = nextBatchIterator()
+        if (batchIterator == null) {
+          return false
         }
-        nextItem = readNextItem()
       }
-      nextItem != null
+      batchIterator.hasNext
     }
 
     override def next(): (K, C) = {
-      if (!hasNext) {
+      if (batchIterator == null) {

Review Comment:
   In that case it will call next on the empty iterator and we should still throw a `NoSuchElementException`. But `!hasNext` should also have that behavior so can change back to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1379292695

   Thank you for sharing the result. Without a clear win, it's hard for us to accept this proposal because this is one of the crucial part.
   - Could you add a benchmark for your specific cases (lots of small streams)? 
   - If there is no regression in the existing benchmarks, your new benchmark can provide us more explicit evidence of this PR's contribution and help us to build a consensus on this direction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1408722738

   Wait, am I reading this wrong, or is the new approach slower? useIterator=true cases seem to take a tiny bit longer. If that's right, what's the benefit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1537102791

   Looks fine to me.
   We can merge once @dongjoon-hyun's comment is addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1542206911

   Merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1536431558

   Looks OK to me; @mridulm ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1011229019


##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -324,6 +326,36 @@ class KryoDeserializationStream(
       }
     }
   }
+
+  final override def asIterator: Iterator[Any] = new NextIterator[Any] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        readValue[Any]()
+      } else {
+        finished = true
+        null
+      }
+    }
+
+    override protected def close(): Unit = {
+      KryoDeserializationStream.this.close()
+    }
+  }
+
+  final override def asKeyValueIterator: Iterator[(Any, Any)] = new NextIterator[(Any, Any)] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        (readKey[Any](), readValue[Any]())

Review Comment:
   Or potentially do something better.
   
   ```
     if (hasNext) {
       val key = readKey()
       if (hasNext) {
         return (key, readValue())
       }
     }
   ```
   
   But given this is corner case enough, I would consider this change mostly a nit.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1011229019


##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -324,6 +326,36 @@ class KryoDeserializationStream(
       }
     }
   }
+
+  final override def asIterator: Iterator[Any] = new NextIterator[Any] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        readValue[Any]()
+      } else {
+        finished = true
+        null
+      }
+    }
+
+    override protected def close(): Unit = {
+      KryoDeserializationStream.this.close()
+    }
+  }
+
+  final override def asKeyValueIterator: Iterator[(Any, Any)] = new NextIterator[(Any, Any)] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        (readKey[Any](), readValue[Any]())

Review Comment:
   Or potentially do something better.
   
   ```
     if (hasNext) {
       val key = readKey()
       if (hasNext) {
         return (key, readValue())
       }
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1410514876

   > Wait, am I reading this wrong, or is the new approach slower? useIterator=true cases seem to take a tiny bit longer. If that's right, what's the benefit?
   
   I think you are reading it wrong. Yes comparing `useIterator=true` to `useIterator=false` it probably a bit slower. (This could be that the useIterator=false use a correctly sized array we collecting the data or just that there is overhead in using the iterator) But that is not the point of the comparison/benchmark. 
   
   The goal of this PR was to avoid throwing exceptions to indicate of stream as creating the exception (and adding the stacktrace) is fairly expensive for small streams. (I think having EOFException is also problematic for correctness as we might misinterpret an EOFException on e.g a truncated buffer and silently ignore missing data. But the initial review said that we wanted to keep the existing `DeserializationStream` so this behavior is kept here). The point of the benchmark was that adding the benchmark to a PR prior to this change had results like
   
   ```
   penJDK 64-Bit Server VM 17.0.5+8 on Linux 5.15.0-1031-azure
   Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
   Benchmark of kryo asIterator on deserialization stream:        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   ---------------------------------------------------------------------------------------------------------------------------------------------
   Colletion of int with 1 elements, useIterator: true                      113            114           1          0.1       11274.8       1.0X
   Colletion of int with 10 elements, useIterator: true                     125            126           0          0.1       12547.7       0.9X
   Colletion of int with 100 elements, useIterator: true                    243            244           1          0.0       24324.0       0.5X
   Colletion of string with 1 elements, useIterator: true                   112            112           0          0.1       11207.8       1.0X
   Colletion of string with 10 elements, useIterator: true                  135            135           0          0.1       13459.3       0.8X
   Colletion of string with 100 elements, useIterator: true                 350            351           1          0.0       35010.5       0.3X
   Colletion of Array[int] with 1 elements, useIterator: true               112            113           1          0.1       11235.7       1.0X
   Colletion of Array[int] with 10 elements, useIterator: true              132            132           1          0.1       13167.5       0.9X
   Colletion of Array[int] with 100 elements, useIterator: true             334            334           1          0.0       33351.4       0.3X
   Colletion of int with 1 elements, useIterator: false                      14             14           0          0.7        1364.5       8.3X
   Colletion of int with 10 elements, useIterator: false                     24             25           0          0.4        2414.8       4.7X
   Colletion of int with 100 elements, useIterator: false                   132            132           0          0.1       13171.3       0.9X
   Colletion of string with 1 elements, useIterator: false                   15             15           0          0.7        1504.1       7.5X
   Colletion of string with 10 elements, useIterator: false                  36             37           1          0.3        3633.5       3.1X
   Colletion of string with 100 elements, useIterator: false                242            242           1          0.0       24161.7       0.5X
   Colletion of Array[int] with 1 elements, useIterator: false               15             15           0          0.7        1457.2       7.7X
   Colletion of Array[int] with 10 elements, useIterator: false              32             33           0          0.3        3222.4       3.5X
   Colletion of Array[int] with 100 elements, useIterator: false            219            219           0          0.0       21873.1       0.5X
   ```
   
   And in this branch:
   ```
   OpenJDK 64-Bit Server VM 17.0.5+8 on Linux 5.15.0-1031-azure
   Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
   Benchmark of kryo asIterator on deserialization stream:        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   ---------------------------------------------------------------------------------------------------------------------------------------------
   Colletion of int with 1 elements, useIterator: true                       22             25           2          0.5        2160.1       1.0X
   Colletion of int with 10 elements, useIterator: true                      37             41           2          0.3        3742.8       0.6X
   Colletion of int with 100 elements, useIterator: true                    192            199           5          0.1       19184.5       0.1X
   Colletion of string with 1 elements, useIterator: true                    24             26           1          0.4        2384.5       0.9X
   Colletion of string with 10 elements, useIterator: true                   51             58           3          0.2        5092.8       0.4X
   Colletion of string with 100 elements, useIterator: true                 338            355          12          0.0       33833.5       0.1X
   Colletion of Array[int] with 1 elements, useIterator: true                24             28           5          0.4        2372.5       0.9X
   Colletion of Array[int] with 10 elements, useIterator: true               48             53           3          0.2        4841.6       0.4X
   Colletion of Array[int] with 100 elements, useIterator: true             341            356           8          0.0       34118.0       0.1X
   Colletion of int with 1 elements, useIterator: false                      21             25           3          0.5        2103.1       1.0X
   Colletion of int with 10 elements, useIterator: false                     37             42           3          0.3        3729.3       0.6X
   Colletion of int with 100 elements, useIterator: false                   171            181          10          0.1       17105.7       0.1X
   Colletion of string with 1 elements, useIterator: false                   24             27           2          0.4        2410.6       0.9X
   Colletion of string with 10 elements, useIterator: false                  52             57           3          0.2        5213.2       0.4X
   Colletion of string with 100 elements, useIterator: false                343            354           8          0.0       34284.3       0.1X
   Colletion of Array[int] with 1 elements, useIterator: false               23             26           2          0.4        2339.2       0.9X
   Colletion of Array[int] with 10 elements, useIterator: false              46             53           7          0.2        4637.8       0.5X
   Colletion of Array[int] with 100 elements, useIterator: false            302            326          12          0.0       30210.0       0.1X
   ```
   
   The second run was on significantly slower cpu so the time can not be compared directly between them. But the main take away is that by not throwing an exception (it was actually two exceptions as we raised a different after catching the one from kryo) when reaching the end of the stream, we lowered the performance difference between using the iterator interface and not using it. 
   
   If we could avoid using the iterator interface that would be better. But the way it avoid in the benchmark is that it leverages that it knows how many items to expect in the stream, but I believe that in our normal use cases this information is usually not available.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1083538101


##########
core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.serializer.KryoTest._
+
+/**
+ * Benchmark for kryo asIterator on a deserialization stream". To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> <spark core test jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoIteratorBenchmark extends BenchmarkBase {
+  val N = 10000
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val name = "Benchmark of kryo asIterator on deserialization stream"
+    runBenchmark(name) {
+      val benchmark = new Benchmark(name, N, 10, output = output)
+      Seq(true, false).map(useIterator => run(useIterator, benchmark))
+      benchmark.run()
+    }
+  }
+
+  private def run(useIterator: Boolean, benchmark: Benchmark): Unit = {
+    val ser = createSerializer()
+
+    def roundTrip[T: ClassTag](
+        elements: Array[T],
+        useIterator: Boolean,
+        ser: SerializerInstance): Int = {
+      val serialized: Array[Byte] = {
+        val baos = new ByteArrayOutputStream()
+        val serStream = ser.serializeStream(baos)
+        var i = 0
+        while (i < elements.length) {
+          serStream.writeObject(elements(i))
+          i += 1
+        }
+        serStream.close()
+        baos.toByteArray
+      }
+
+      val deserStream = ser.deserializeStream(new ByteArrayInputStream(serialized))
+      if (useIterator) {
+        if (deserStream.asIterator.toArray.length == elements.length) 1 else 0
+      } else {
+        val res = new Array[T](elements.length)
+        var i = 0
+        while (i < elements.length) {
+          res(i) = deserStream.readValue()
+          i += 1
+        }
+        deserStream.close()
+        if (res.length == elements.length) 1 else 0
+      }
+    }
+
+    def createCase[T: ClassTag](name: String, elementCount: Int, createElement: => T): Unit = {
+      val elements = Array.fill[T](elementCount)(createElement)
+
+      benchmark.addCase(
+        s"Colletion of $name with $elementCount elements, useIterator: $useIterator") { _ =>
+        var sum = 0L
+        var i = 0
+        while (i < N) {
+          sum += roundTrip(elements, useIterator, ser)
+          i += 1
+        }
+        sum
+      }
+    }
+
+    createCase("int", 1, Random.nextInt)
+    createCase("int", 10, Random.nextInt)
+    createCase("int", 100, Random.nextInt)
+    createCase("string", 1, Random.nextString(5))
+    createCase("string", 10, Random.nextString(5))
+    createCase("string", 100, Random.nextString(5))

Review Comment:
   Could you add more complex data structure like Array?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1299630091

   The PR as such looks reasonable to me - can we add a test to explicitly test for EOF behavior ?
   
   +CC @JoshRosen who had worked on this in the distant past :-)
   +CC @Ngone51 
   
   I want to make sure there are more eyes on this change.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in DeserializationStream

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1295039656

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1193411810


##########
core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.serializer.KryoTest._
+
+/**
+ * Benchmark for kryo asIterator on a deserialization stream". To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> <spark core test jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoIteratorBenchmark extends BenchmarkBase {
+  val N = 10000
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val name = "Benchmark of kryo asIterator on deserialization stream"
+    runBenchmark(name) {
+      val benchmark = new Benchmark(name, N, 10, output = output)
+      Seq(true, false).map(useIterator => run(useIterator, benchmark))
+      benchmark.run()
+    }
+  }
+
+  private def run(useIterator: Boolean, benchmark: Benchmark): Unit = {
+    val ser = createSerializer()
+
+    def roundTrip[T: ClassTag](
+        elements: Array[T],
+        useIterator: Boolean,
+        ser: SerializerInstance): Int = {
+      val serialized: Array[Byte] = {
+        val baos = new ByteArrayOutputStream()

Review Comment:
   The GC will/might make the benchmark more noisy but it should not introduce any bias? 
   
   I guess choosing a bigger initial size will reduce the issue for some of the benchmark for some of the cases as it will not need to resize, but I can not see any simple way to estimate the total size in general. But maybe using a bigger initial size is better/good enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in DeserializationStream

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1008798658


##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  final private[this] def hasNext: Boolean = {

Review Comment:
   `private[this]` is sufficient ?



##########
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:
##########
@@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C](
      * If no more pairs are left, return null.
      */
     private def readNextItem(): (K, C) = {
-      try {
-        val k = deserializeStream.readKey().asInstanceOf[K]
-        val c = deserializeStream.readValue().asInstanceOf[C]
-        val item = (k, c)
-        objectsRead += 1
-        if (objectsRead == serializerBatchSize) {
-          objectsRead = 0
-          deserializeStream = nextBatchStream()
-        }
-        item
-      } catch {
-        case e: EOFException =>
-          cleanup()
-          null
+      val next = batchIterator.next()
+      objectsRead += 1
+      if (objectsRead == serializerBatchSize) {
+        objectsRead = 0
+        batchIterator = nextBatchIterator()
       }
+      next
     }
 
     override def hasNext: Boolean = {
-      if (nextItem == null) {
-        if (deserializeStream == null) {
-          // In case of deserializeStream has not been initialized
-          deserializeStream = nextBatchStream()
-          if (deserializeStream == null) {
-            return false
-          }
+      if (batchIterator == null) {
+        // In case of batchIterator has not been initialized
+        batchIterator = nextBatchIterator()
+        if (batchIterator == null) {
+          return false
         }
-        nextItem = readNextItem()
       }
-      nextItem != null
+      batchIterator.hasNext
     }
 
     override def next(): (K, C) = {
-      if (!hasNext) {
+      if (batchIterator == null) {

Review Comment:
   We need `hasNext` here - if current iterator is exhausted, it will be non-null, but empty



##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  final private[this] def hasNext: Boolean = {
+    if (input == null) {
+      return false
+    }
+
+    val eof = input.eof()
+    if (eof) close()
+    !eof
+  }
+
   override def readObject[T: ClassTag](): T = {
-    try {
       kryo.readClassAndObject(input).asInstanceOf[T]
-    } catch {
-      // DeserializationStream uses the EOF exception to indicate stopping condition.
-      case e: KryoException
-        if e.getMessage.toLowerCase(Locale.ROOT).contains("buffer underflow") =>
-        throw new EOFException
-    }

Review Comment:
   Preserve this even with the proposed change of checking eof - to continue catching cases where EOF is encountered prematurely ?
   This will be mainly to handle abnormal cases, instead of the common case.



##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -324,6 +326,36 @@ class KryoDeserializationStream(
       }
     }
   }
+
+  final override def asIterator: Iterator[Any] = new NextIterator[Any] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        readValue[Any]()
+      } else {
+        finished = true
+        null
+      }
+    }
+
+    override protected def close(): Unit = {
+      KryoDeserializationStream.this.close()
+    }
+  }
+
+  final override def asKeyValueIterator: Iterator[(Any, Any)] = new NextIterator[(Any, Any)] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        (readKey[Any](), readValue[Any]())

Review Comment:
   Given we are fix this, not make assumptions that if key is present, value will be as well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1377871914

   Want to see if we can make this for 3.4 - more eyes on it would be good.
   
   +CC @Ngone51, @srowen, @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1407369412

   Rebased and add the results from all the benchmarks.
   
   To me is not clear that the regression or we are just looking at noise. Is there some way to make the bechmark more reproducible? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1090260160


##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -329,6 +339,42 @@ class KryoDeserializationStream(
       }
     }
   }
+
+  final override def asIterator: Iterator[Any] = new NextIterator[Any] {
+    override protected def getNext(): Any = {
+      if (KryoDeserializationStream.this.hasNext) {
+        try {
+          return readValue[Any]()

Review Comment:
   `readObject` instead ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
eejbyfeldt commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1380312253

   > The PR as such looks reasonable to me - can we add a test to explicitly test for EOF behavior ?
   
   @mridulm  I added a spec for this in: https://github.com/apache/spark/pull/38428/commits/77e616a910cbe7330c612e7ae8a34707c3bc8fb1
   
   
   > Could you add a benchmark for your specific cases (lots of small streams)?
   
   Added a benchmark that shows that there is overhead in using `asIterator.toArray` compared to just reading the number expected elements in the current master that goes away in this branch. Add results from master with benchmark added (this branch: https://github.com/eejbyfeldt/spark/tree/SPARK-40912-only-adding-benchmark) in https://github.com/apache/spark/pull/38428/commits/75806338f9f56cb4ef9bbac4035e825ef07b5ab8 and then overwrote them in with this branch in https://github.com/apache/spark/pull/38428/commits/bc011c639619be65656b081edf2e1f8bcd91be44


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1408128678

   Any additional thoughts @dongjoon-hyun ? I want to make sure this patch is well reviewed before merging it.
   +CC @JoshRosen, @Ngone51 as well for review if you have the bandwidth.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen closed pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen closed pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream 
URL: https://github.com/apache/spark/pull/38428


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1190146651


##########
core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.serializer.KryoTest._
+
+/**
+ * Benchmark for kryo asIterator on a deserialization stream". To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> <spark core test jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoIteratorBenchmark extends BenchmarkBase {
+  val N = 10000
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val name = "Benchmark of kryo asIterator on deserialization stream"
+    runBenchmark(name) {
+      val benchmark = new Benchmark(name, N, 10, output = output)
+      Seq(true, false).map(useIterator => run(useIterator, benchmark))
+      benchmark.run()
+    }
+  }
+
+  private def run(useIterator: Boolean, benchmark: Benchmark): Unit = {
+    val ser = createSerializer()
+
+    def roundTrip[T: ClassTag](
+        elements: Array[T],
+        useIterator: Boolean,
+        ser: SerializerInstance): Int = {
+      val serialized: Array[Byte] = {
+        val baos = new ByteArrayOutputStream()

Review Comment:
   The initial size of `ByteArrayOutputStream` is 32. Will the grow of underlying data and GC affect the test results? If so, is it possible to estimate a reasonable initial size?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1384775734

   There is a small regression for `Colletion of int with 100 elements, useIterator: false` case [here](https://github.com/apache/spark/pull/38428/commits/83389eb6e8782f3bf77d6ba92ed7a39d770d8266) - but rest of the numbers look compelling.
   Curious why `[KryoIteratorBenchmark-results.txt](https://github.com/apache/spark/pull/38428/commits/83389eb6e8782f3bf77d6ba92ed7a39d770d8266#diff-f4298856b3b343fdb7f43f38a5fb9197b166e3ae9cfc1f258967f7d5dee8fdb1)` is not seeing improvements for the last two cases (`int` and `string`).
   
   +CC @dongjoon-hyun to take a look at the results too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1090261774


##########
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:
##########
@@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C](
      * If no more pairs are left, return null.
      */
     private def readNextItem(): (K, C) = {
-      try {
-        val k = deserializeStream.readKey().asInstanceOf[K]
-        val c = deserializeStream.readValue().asInstanceOf[C]
-        val item = (k, c)
-        objectsRead += 1
-        if (objectsRead == serializerBatchSize) {
-          objectsRead = 0
-          deserializeStream = nextBatchStream()
-        }
-        item
-      } catch {
-        case e: EOFException =>
-          cleanup()
-          null
+      val next = batchIterator.next()

Review Comment:
   super nit: rename `next` to `kv` or `item`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1410597122

   OK I get it, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
eejbyfeldt commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1378675283

   So I ran the benchmark:
   ```
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1190137791


##########
core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.serializer.KryoTest._
+
+/**
+ * Benchmark for kryo asIterator on a deserialization stream". To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> <spark core test jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoIteratorBenchmark extends BenchmarkBase {
+  val N = 10000
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val name = "Benchmark of kryo asIterator on deserialization stream"
+    runBenchmark(name) {
+      val benchmark = new Benchmark(name, N, 10, output = output)
+      Seq(true, false).map(useIterator => run(useIterator, benchmark))
+      benchmark.run()
+    }
+  }
+
+  private def run(useIterator: Boolean, benchmark: Benchmark): Unit = {
+    val ser = createSerializer()
+
+    def roundTrip[T: ClassTag](
+        elements: Array[T],
+        useIterator: Boolean,
+        ser: SerializerInstance): Int = {
+      val serialized: Array[Byte] = {
+        val baos = new ByteArrayOutputStream()
+        val serStream = ser.serializeStream(baos)
+        var i = 0
+        while (i < elements.length) {
+          serStream.writeObject(elements(i))
+          i += 1
+        }
+        serStream.close()
+        baos.toByteArray
+      }
+
+      val deserStream = ser.deserializeStream(new ByteArrayInputStream(serialized))
+      if (useIterator) {
+        if (deserStream.asIterator.toArray.length == elements.length) 1 else 0
+      } else {
+        val res = new Array[T](elements.length)
+        var i = 0
+        while (i < elements.length) {
+          res(i) = deserStream.readValue()
+          i += 1
+        }
+        deserStream.close()
+        if (res.length == elements.length) 1 else 0
+      }
+    }
+
+    def createCase[T: ClassTag](name: String, elementCount: Int, createElement: => T): Unit = {
+      val elements = Array.fill[T](elementCount)(createElement)
+
+      benchmark.addCase(
+        s"Colletion of $name with $elementCount elements, useIterator: $useIterator") { _ =>

Review Comment:
   Typo: Colletion -> Collection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1532703025

   > This slipped through my radar, thanks for the ping. Can you update to latest master @eejbyfeldt ?
   
   Updated. Will also rerun the benchmarks since those had conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1011229842


##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  final private[this] def hasNext: Boolean = {
+    if (input == null) {
+      return false
+    }
+
+    val eof = input.eof()
+    if (eof) close()
+    !eof
+  }
+
   override def readObject[T: ClassTag](): T = {
-    try {
       kryo.readClassAndObject(input).asInstanceOf[T]
-    } catch {
-      // DeserializationStream uses the EOF exception to indicate stopping condition.
-      case e: KryoException
-        if e.getMessage.toLowerCase(Locale.ROOT).contains("buffer underflow") =>
-        throw new EOFException
-    }

Review Comment:
   Agree. We should investigate that - but let us do it separately from this PR, since this change will be beneficial even without that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1011230912


##########
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:
##########
@@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C](
      * If no more pairs are left, return null.
      */
     private def readNextItem(): (K, C) = {
-      try {
-        val k = deserializeStream.readKey().asInstanceOf[K]
-        val c = deserializeStream.readValue().asInstanceOf[C]
-        val item = (k, c)
-        objectsRead += 1
-        if (objectsRead == serializerBatchSize) {
-          objectsRead = 0
-          deserializeStream = nextBatchStream()
-        }
-        item
-      } catch {
-        case e: EOFException =>
-          cleanup()
-          null
+      val next = batchIterator.next()
+      objectsRead += 1
+      if (objectsRead == serializerBatchSize) {
+        objectsRead = 0
+        batchIterator = nextBatchIterator()
       }
+      next
     }
 
     override def hasNext: Boolean = {
-      if (nextItem == null) {
-        if (deserializeStream == null) {
-          // In case of deserializeStream has not been initialized
-          deserializeStream = nextBatchStream()
-          if (deserializeStream == null) {
-            return false
-          }
+      if (batchIterator == null) {
+        // In case of batchIterator has not been initialized
+        batchIterator = nextBatchIterator()
+        if (batchIterator == null) {
+          return false
         }
-        nextItem = readNextItem()
       }
-      nextItem != null
+      batchIterator.hasNext
     }
 
     override def next(): (K, C) = {
-      if (!hasNext) {
+      if (batchIterator == null) {

Review Comment:
   The current change looks good to me, resolving comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1190137289


##########
core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.serializer.KryoTest._
+
+/**
+ * Benchmark for kryo asIterator on a deserialization stream". To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> <spark core test jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoIteratorBenchmark extends BenchmarkBase {
+  val N = 10000
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val name = "Benchmark of kryo asIterator on deserialization stream"
+    runBenchmark(name) {
+      val benchmark = new Benchmark(name, N, 10, output = output)
+      Seq(true, false).map(useIterator => run(useIterator, benchmark))

Review Comment:
   nit: should use `.foreach` instead of `.map`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1522936049

   Ping: Anyone has more thoughts or comments on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #38428: [SPARK-40912][CORE][WIP] Overhead of Exceptions in KryoDeserializationStream

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1377876289

   Thank you for pinging me, @mridulm .
   Also, cc @sunchao too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #38428:
URL: https://github.com/apache/spark/pull/38428#issuecomment-1399596628

   Got it, @mridulm . Could you rebase this once more, @eejbyfeldt ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] eejbyfeldt commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1089711014


##########
core/src/test/scala/org/apache/spark/serializer/KryoIteratorBenchmark.scala:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.serializer.KryoTest._
+
+/**
+ * Benchmark for kryo asIterator on a deserialization stream". To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> <spark core test jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoIteratorBenchmark extends BenchmarkBase {
+  val N = 10000
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val name = "Benchmark of kryo asIterator on deserialization stream"
+    runBenchmark(name) {
+      val benchmark = new Benchmark(name, N, 10, output = output)
+      Seq(true, false).map(useIterator => run(useIterator, benchmark))
+      benchmark.run()
+    }
+  }
+
+  private def run(useIterator: Boolean, benchmark: Benchmark): Unit = {
+    val ser = createSerializer()
+
+    def roundTrip[T: ClassTag](
+        elements: Array[T],
+        useIterator: Boolean,
+        ser: SerializerInstance): Int = {
+      val serialized: Array[Byte] = {
+        val baos = new ByteArrayOutputStream()
+        val serStream = ser.serializeStream(baos)
+        var i = 0
+        while (i < elements.length) {
+          serStream.writeObject(elements(i))
+          i += 1
+        }
+        serStream.close()
+        baos.toByteArray
+      }
+
+      val deserStream = ser.deserializeStream(new ByteArrayInputStream(serialized))
+      if (useIterator) {
+        if (deserStream.asIterator.toArray.length == elements.length) 1 else 0
+      } else {
+        val res = new Array[T](elements.length)
+        var i = 0
+        while (i < elements.length) {
+          res(i) = deserStream.readValue()
+          i += 1
+        }
+        deserStream.close()
+        if (res.length == elements.length) 1 else 0
+      }
+    }
+
+    def createCase[T: ClassTag](name: String, elementCount: Int, createElement: => T): Unit = {
+      val elements = Array.fill[T](elementCount)(createElement)
+
+      benchmark.addCase(
+        s"Colletion of $name with $elementCount elements, useIterator: $useIterator") { _ =>
+        var sum = 0L
+        var i = 0
+        while (i < N) {
+          sum += roundTrip(elements, useIterator, ser)
+          i += 1
+        }
+        sum
+      }
+    }
+
+    createCase("int", 1, Random.nextInt)
+    createCase("int", 10, Random.nextInt)
+    createCase("int", 100, Random.nextInt)
+    createCase("string", 1, Random.nextString(5))
+    createCase("string", 10, Random.nextString(5))
+    createCase("string", 100, Random.nextString(5))

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38428: [SPARK-40912][CORE]Overhead of Exceptions in KryoDeserializationStream

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1090263540


##########
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:
##########
@@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C](
      * If no more pairs are left, return null.
      */
     private def readNextItem(): (K, C) = {
-      try {
-        val k = deserializeStream.readKey().asInstanceOf[K]
-        val c = deserializeStream.readValue().asInstanceOf[C]
-        val item = (k, c)
-        objectsRead += 1
-        if (objectsRead == serializerBatchSize) {
-          objectsRead = 0
-          deserializeStream = nextBatchStream()
-        }
-        item
-      } catch {
-        case e: EOFException =>
-          cleanup()
-          null
+      val next = batchIterator.next()
+      objectsRead += 1
+      if (objectsRead == serializerBatchSize) {
+        objectsRead = 0
+        batchIterator = nextBatchIterator()
       }
+      next
     }
 
     override def hasNext: Boolean = {
-      if (nextItem == null) {
-        if (deserializeStream == null) {
-          // In case of deserializeStream has not been initialized
-          deserializeStream = nextBatchStream()
-          if (deserializeStream == null) {
-            return false
-          }
+      if (batchIterator == null) {
+        // In case of batchIterator has not been initialized
+        batchIterator = nextBatchIterator()
+        if (batchIterator == null) {
+          return false
         }
-        nextItem = readNextItem()
       }
-      nextItem != null
+      batchIterator.hasNext

Review Comment:
   Review Note: this change is preserving the behavior which exists - but as a general construct, if the returned `batchIterator` ends up being empty - we wont go to the next batch iterator.
   This is not expected to be an issue though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org