You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "eejbyfeldt (via GitHub)" <gi...@apache.org> on 2023/08/24 07:37:56 UTC

[GitHub] [spark] eejbyfeldt commented on pull request #37206: [SPARK-39696][CORE] Ensure Concurrent r/w `TaskMetrics` not throw Exception

eejbyfeldt commented on PR #37206:
URL: https://github.com/apache/spark/pull/37206#issuecomment-1691166611

   Hi @gowa, I don't think you issue you describe has the same root cause and I think it more related to how serialization work. Consider the following code:
   ```
   $ cat custom_serialization.scala 
   import java.io._
   
   abstract class Acc extends Serializable {
     private def readObject(in: ObjectInputStream): Unit = {
       in.defaultReadObject()
       println(s"readObject Acc ${toString()}")
     }
   }
   
   class MyAcc extends Acc {
     final private val myVar = Map()
     override def toString: String = {
       s"Class MyAcc(myVar=${myVar})"
     }
   }
   
   object Test {
     def main(args: Array[String]): Unit = {
         val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream(1024 * 1024)
         val objectStream: ObjectOutputStream = new ObjectOutputStream(outputStream)
         objectStream.writeObject(new MyAcc)
         objectStream.close()
         val input = new ObjectInputStream(new ByteArrayInputStream(outputStream.toByteArray())) 
         val result = input.readObject()
         println(result)
     }
   }
   ```
   when executed it will print
   ```
   $ scala custom_serialization.scala 
   readObject Acc Class MyAcc(myVar=null)
   Class MyAcc(myVar=Map())
   ```
   
   This code is similar to what we have in `AccumulatorV2` which also has a custom `readObject` method https://github.com/apache/spark/blob/v3.4.1/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L191 it from inside the method that the accumulator is registered and therefore becomes accessible from the hearbeat thread. So there is a possibility that the heartbeat thread might call `isZero` after AccummulatorV2.readObject called but before the default deserialization of your custom class has not completed. This hole setup of giving up references to objects that are not fully deserialized is really nasty.
   
   Here is a PR addressing/discussing the same bug from an accumulator in spark: https://github.com/apache/spark/pull/31540


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org