You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/03/02 01:24:06 UTC

[spark] branch branch-2.4 updated: [SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq)

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 0d1664c  [SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq)
0d1664c is described below

commit 0d1664c9f8178285c924b44546ebbd059d92e3df
Author: Josh Rosen <ro...@gmail.com>
AuthorDate: Mon Mar 2 10:19:12 2020 +0900

    [SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq)
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.
    
    Here is an example demonstrating the problem:
    
    ```scala
    import org.apache.spark.sql._
    
    val enc = implicitly[Encoder[(Int, Int)]]
    
    val datasets = (1 to 100).par.map { _ =>
      val pairs = (1 to 100).map(x => (x, x))
      spark.createDataset(pairs)(enc)
    }
    
    datasets.reduce(_ union _).collect().foreach {
      pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
    }
    ```
    
    Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.
    
    This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.
    
    The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tested manually using the example listed above.
    
    Thanks to smcnamara-stripe for identifying this bug.
    
    Closes #26076 from JoshRosen/SPARK-29419.
    
    Authored-by: Josh Rosen <ro...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit f4499f678dc2e9f72c3ee5d2af083aa6b98f3fc2)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 2b847fb..edbd02b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -467,7 +467,8 @@ class SparkSession private(
   @Experimental
   @InterfaceStability.Evolving
   def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
-    val enc = encoderFor[T]
+    // `ExpressionEncoder` is not thread-safe, here we create a new encoder.
+    val enc = encoderFor[T].copy()
     val attributes = enc.schema.toAttributes
     val encoded = data.map(d => enc.toRow(d).copy())
     val plan = new LocalRelation(attributes, encoded)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org