You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/04/22 01:18:57 UTC

[spark] branch master updated: [SPARK-31511][SQL] Make BytesToBytesMap iterators thread-safe

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cf60384  [SPARK-31511][SQL] Make BytesToBytesMap iterators thread-safe
cf60384 is described below

commit cf6038499d3d5686ab3377f550c2fe507d022fd3
Author: herman <he...@databricks.com>
AuthorDate: Tue Apr 21 18:17:19 2020 -0700

    [SPARK-31511][SQL] Make BytesToBytesMap iterators thread-safe
    
    ### What changes were proposed in this pull request?
    This PR increases the thread safety of the `BytesToBytesMap`:
    - It makes the `iterator()` and `destructiveIterator()` methods used their own `Location` object. This used to be shared, and this was causing issues when the map was being iterated over in two threads by two different iterators.
    - Removes the `safeIterator()` function. This is not needed anymore.
    - Improves the documentation of a couple of methods w.r.t. thread-safety.
    
    ### Why are the changes needed?
    It is unexpected an iterator shares the object it is returning with all other iterators. This is a violation of the iterator contract, and it causes issues with iterators over a map that are consumed in different threads.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests.
    
    Closes #28286 from hvanhovell/SPARK-31511.
    
    Authored-by: herman <he...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java   | 23 +++++++++-------------
 .../spark/sql/execution/joins/HashedRelation.scala |  2 +-
 2 files changed, 10 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 64c240c..6e02888 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -406,17 +406,10 @@ public final class BytesToBytesMap extends MemoryConsumer {
    *
    * For efficiency, all calls to `next()` will return the same {@link Location} object.
    *
-   * If any other lookups or operations are performed on this map while iterating over it, including
-   * `lookup()`, the behavior of the returned iterator is undefined.
+   * The returned iterator is thread-safe. However if the map is modified while iterating over it,
+   * the behavior of the returned iterator is undefined.
    */
   public MapIterator iterator() {
-    return new MapIterator(numValues, loc, false);
-  }
-
-  /**
-   * Returns a thread safe iterator that iterates of the entries of this map.
-   */
-  public MapIterator safeIterator() {
     return new MapIterator(numValues, new Location(), false);
   }
 
@@ -427,19 +420,20 @@ public final class BytesToBytesMap extends MemoryConsumer {
    *
    * For efficiency, all calls to `next()` will return the same {@link Location} object.
    *
-   * If any other lookups or operations are performed on this map while iterating over it, including
-   * `lookup()`, the behavior of the returned iterator is undefined.
+   * The returned iterator is thread-safe. However if the map is modified while iterating over it,
+   * the behavior of the returned iterator is undefined.
    */
   public MapIterator destructiveIterator() {
     updatePeakMemoryUsed();
-    return new MapIterator(numValues, loc, true);
+    return new MapIterator(numValues, new Location(), true);
   }
 
   /**
    * Looks up a key, and return a {@link Location} handle that can be used to test existence
    * and read/write values.
    *
-   * This function always return the same {@link Location} instance to avoid object allocation.
+   * This function always returns the same {@link Location} instance to avoid object allocation.
+   * This function is not thread-safe.
    */
   public Location lookup(Object keyBase, long keyOffset, int keyLength) {
     safeLookup(keyBase, keyOffset, keyLength, loc,
@@ -451,7 +445,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
    * Looks up a key, and return a {@link Location} handle that can be used to test existence
    * and read/write values.
    *
-   * This function always return the same {@link Location} instance to avoid object allocation.
+   * This function always returns the same {@link Location} instance to avoid object allocation.
+   * This function is not thread-safe.
    */
   public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash) {
     safeLookup(keyBase, keyOffset, keyLength, loc, hash);
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 4001338..13180d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -177,7 +177,7 @@ private[joins] class UnsafeHashedRelation(
   }
 
   override def keys(): Iterator[InternalRow] = {
-    val iter = binaryMap.safeIterator()
+    val iter = binaryMap.iterator()
 
     new Iterator[InternalRow] {
       val unsafeRow = new UnsafeRow(numKeys)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org