You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/09/08 11:55:54 UTC

[spark] branch master updated: [SPARK-31511][FOLLOW-UP][TEST][SQL] Make BytesToBytesMap iterators thread-safe

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd3dc2f5 [SPARK-31511][FOLLOW-UP][TEST][SQL] Make BytesToBytesMap iterators thread-safe
bd3dc2f5 is described below

commit bd3dc2f54d871d152331612c53f586181f4e87fc
Author: sychen <sy...@ctrip.com>
AuthorDate: Tue Sep 8 11:54:04 2020 +0000

    [SPARK-31511][FOLLOW-UP][TEST][SQL] Make BytesToBytesMap iterators thread-safe
    
    ### What changes were proposed in this pull request?
    Before SPARK-31511 is fixed, `BytesToBytesMap` iterator() is not thread-safe and may cause data inaccuracy.
    We need to add a unit test.
    
    ### Why are the changes needed?
    Increase test coverage to ensure that iterator() is thread-safe.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    add ut
    
    Closes #29669 from cxzl25/SPARK-31511-test.
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/joins/HashedRelationSuite.scala  | 39 ++++++++++++++++++++++
 1 file changed, 39 insertions(+)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 72e921d..caa7bdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -360,6 +360,45 @@ class HashedRelationSuite extends SharedSparkSession {
     assert(java.util.Arrays.equals(os.toByteArray, os2.toByteArray))
   }
 
+  test("SPARK-31511: Make BytesToBytesMap iterators thread-safe") {
+    val ser = sparkContext.env.serializer.newInstance()
+    val key = Seq(BoundReference(0, LongType, false))
+
+    val unsafeProj = UnsafeProjection.create(
+      Seq(BoundReference(0, LongType, false), BoundReference(1, IntegerType, true)))
+    val rows = (0 until 10000).map(i => unsafeProj(InternalRow(Int.int2long(i), i + 1)).copy())
+    val unsafeHashed = UnsafeHashedRelation(rows.iterator, key, 1, mm)
+
+    val os = new ByteArrayOutputStream()
+    val thread1 = new Thread {
+      override def run(): Unit = {
+        val out = new ObjectOutputStream(os)
+        unsafeHashed.asInstanceOf[UnsafeHashedRelation].writeExternal(out)
+        out.flush()
+      }
+    }
+
+    val thread2 = new Thread {
+      override def run(): Unit = {
+        val threadOut = new ObjectOutputStream(new ByteArrayOutputStream())
+        unsafeHashed.asInstanceOf[UnsafeHashedRelation].writeExternal(threadOut)
+        threadOut.flush()
+      }
+    }
+
+    thread1.start()
+    thread2.start()
+    thread1.join()
+    thread2.join()
+
+    val unsafeHashed2 = ser.deserialize[UnsafeHashedRelation](ser.serialize(unsafeHashed))
+    val os2 = new ByteArrayOutputStream()
+    val out2 = new ObjectOutputStream(os2)
+    unsafeHashed2.writeExternal(out2)
+    out2.flush()
+    assert(java.util.Arrays.equals(os.toByteArray, os2.toByteArray))
+  }
+
   // This test require 4G heap to run, should run it manually
   ignore("build HashedRelation that is larger than 1G") {
     val unsafeProj = UnsafeProjection.create(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org