You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/03/02 01:22:00 UTC
[spark] branch branch-3.0 updated: [SPARK-29419][SQL] Fix Encoder
thread-safety bug in createDataset(Seq)
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new aea8749 [SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq)
aea8749 is described below
commit aea8749a038441a8d2092d448065b0fabd9a0674
Author: Josh Rosen <ro...@gmail.com>
AuthorDate: Mon Mar 2 10:19:12 2020 +0900
[SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq)
### What changes were proposed in this pull request?
This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.
Here is an example demonstrating the problem:
```scala
import org.apache.spark.sql._
val enc = implicitly[Encoder[(Int, Int)]]
val datasets = (1 to 100).par.map { _ =>
val pairs = (1 to 100).map(x => (x, x))
spark.createDataset(pairs)(enc)
}
datasets.reduce(_ union _).collect().foreach {
pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```
Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.
This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.
The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Tested manually using the example listed above.
Thanks to smcnamara-stripe for identifying this bug.
Closes #26076 from JoshRosen/SPARK-29419.
Authored-by: Josh Rosen <ro...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit f4499f678dc2e9f72c3ee5d2af083aa6b98f3fc2)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 1fb97fb..bca841c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -459,7 +459,8 @@ class SparkSession private(
* @since 2.0.0
*/
def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
- val enc = encoderFor[T]
+ // `ExpressionEncoder` is not thread-safe, here we create a new encoder.
+ val enc = encoderFor[T].copy()
val attributes = enc.schema.toAttributes
val encoded = data.map(d => enc.toRow(d).copy())
val plan = new LocalRelation(attributes, encoded)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org