You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/09/08 11:55:54 UTC
[spark] branch master updated: [SPARK-31511][FOLLOW-UP][TEST][SQL]
Make BytesToBytesMap iterators thread-safe
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bd3dc2f5 [SPARK-31511][FOLLOW-UP][TEST][SQL] Make BytesToBytesMap iterators thread-safe
bd3dc2f5 is described below
commit bd3dc2f54d871d152331612c53f586181f4e87fc
Author: sychen <sy...@ctrip.com>
AuthorDate: Tue Sep 8 11:54:04 2020 +0000
[SPARK-31511][FOLLOW-UP][TEST][SQL] Make BytesToBytesMap iterators thread-safe
### What changes were proposed in this pull request?
Before SPARK-31511 is fixed, `BytesToBytesMap` iterator() is not thread-safe and may cause data inaccuracy.
We need to add a unit test.
### Why are the changes needed?
Increase test coverage to ensure that iterator() is thread-safe.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
add ut
Closes #29669 from cxzl25/SPARK-31511-test.
Authored-by: sychen <sy...@ctrip.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/execution/joins/HashedRelationSuite.scala | 39 ++++++++++++++++++++++
1 file changed, 39 insertions(+)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 72e921d..caa7bdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -360,6 +360,45 @@ class HashedRelationSuite extends SharedSparkSession {
assert(java.util.Arrays.equals(os.toByteArray, os2.toByteArray))
}
+ test("SPARK-31511: Make BytesToBytesMap iterators thread-safe") {
+ val ser = sparkContext.env.serializer.newInstance()
+ val key = Seq(BoundReference(0, LongType, false))
+
+ val unsafeProj = UnsafeProjection.create(
+ Seq(BoundReference(0, LongType, false), BoundReference(1, IntegerType, true)))
+ val rows = (0 until 10000).map(i => unsafeProj(InternalRow(Int.int2long(i), i + 1)).copy())
+ val unsafeHashed = UnsafeHashedRelation(rows.iterator, key, 1, mm)
+
+ val os = new ByteArrayOutputStream()
+ val thread1 = new Thread {
+ override def run(): Unit = {
+ val out = new ObjectOutputStream(os)
+ unsafeHashed.asInstanceOf[UnsafeHashedRelation].writeExternal(out)
+ out.flush()
+ }
+ }
+
+ val thread2 = new Thread {
+ override def run(): Unit = {
+ val threadOut = new ObjectOutputStream(new ByteArrayOutputStream())
+ unsafeHashed.asInstanceOf[UnsafeHashedRelation].writeExternal(threadOut)
+ threadOut.flush()
+ }
+ }
+
+ thread1.start()
+ thread2.start()
+ thread1.join()
+ thread2.join()
+
+ val unsafeHashed2 = ser.deserialize[UnsafeHashedRelation](ser.serialize(unsafeHashed))
+ val os2 = new ByteArrayOutputStream()
+ val out2 = new ObjectOutputStream(os2)
+ unsafeHashed2.writeExternal(out2)
+ out2.flush()
+ assert(java.util.Arrays.equals(os.toByteArray, os2.toByteArray))
+ }
+
// This test require 4G heap to run, should run it manually
ignore("build HashedRelation that is larger than 1G") {
val unsafeProj = UnsafeProjection.create(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org