You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/04/22 01:18:57 UTC
[spark] branch master updated: [SPARK-31511][SQL] Make
BytesToBytesMap iterators thread-safe
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cf60384 [SPARK-31511][SQL] Make BytesToBytesMap iterators thread-safe
cf60384 is described below
commit cf6038499d3d5686ab3377f550c2fe507d022fd3
Author: herman <he...@databricks.com>
AuthorDate: Tue Apr 21 18:17:19 2020 -0700
[SPARK-31511][SQL] Make BytesToBytesMap iterators thread-safe
### What changes were proposed in this pull request?
This PR increases the thread safety of the `BytesToBytesMap`:
- It makes the `iterator()` and `destructiveIterator()` methods used their own `Location` object. This used to be shared, and this was causing issues when the map was being iterated over in two threads by two different iterators.
- Removes the `safeIterator()` function. This is not needed anymore.
- Improves the documentation of a couple of methods w.r.t. thread-safety.
### Why are the changes needed?
It is unexpected an iterator shares the object it is returning with all other iterators. This is a violation of the iterator contract, and it causes issues with iterators over a map that are consumed in different threads.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #28286 from hvanhovell/SPARK-31511.
Authored-by: herman <he...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/unsafe/map/BytesToBytesMap.java | 23 +++++++++-------------
.../spark/sql/execution/joins/HashedRelation.scala | 2 +-
2 files changed, 10 insertions(+), 15 deletions(-)
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 64c240c..6e02888 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -406,17 +406,10 @@ public final class BytesToBytesMap extends MemoryConsumer {
*
* For efficiency, all calls to `next()` will return the same {@link Location} object.
*
- * If any other lookups or operations are performed on this map while iterating over it, including
- * `lookup()`, the behavior of the returned iterator is undefined.
+ * The returned iterator is thread-safe. However if the map is modified while iterating over it,
+ * the behavior of the returned iterator is undefined.
*/
public MapIterator iterator() {
- return new MapIterator(numValues, loc, false);
- }
-
- /**
- * Returns a thread safe iterator that iterates of the entries of this map.
- */
- public MapIterator safeIterator() {
return new MapIterator(numValues, new Location(), false);
}
@@ -427,19 +420,20 @@ public final class BytesToBytesMap extends MemoryConsumer {
*
* For efficiency, all calls to `next()` will return the same {@link Location} object.
*
- * If any other lookups or operations are performed on this map while iterating over it, including
- * `lookup()`, the behavior of the returned iterator is undefined.
+ * The returned iterator is thread-safe. However if the map is modified while iterating over it,
+ * the behavior of the returned iterator is undefined.
*/
public MapIterator destructiveIterator() {
updatePeakMemoryUsed();
- return new MapIterator(numValues, loc, true);
+ return new MapIterator(numValues, new Location(), true);
}
/**
* Looks up a key, and return a {@link Location} handle that can be used to test existence
* and read/write values.
*
- * This function always return the same {@link Location} instance to avoid object allocation.
+ * This function always returns the same {@link Location} instance to avoid object allocation.
+ * This function is not thread-safe.
*/
public Location lookup(Object keyBase, long keyOffset, int keyLength) {
safeLookup(keyBase, keyOffset, keyLength, loc,
@@ -451,7 +445,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
* Looks up a key, and return a {@link Location} handle that can be used to test existence
* and read/write values.
*
- * This function always return the same {@link Location} instance to avoid object allocation.
+ * This function always returns the same {@link Location} instance to avoid object allocation.
+ * This function is not thread-safe.
*/
public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash) {
safeLookup(keyBase, keyOffset, keyLength, loc, hash);
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 4001338..13180d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -177,7 +177,7 @@ private[joins] class UnsafeHashedRelation(
}
override def keys(): Iterator[InternalRow] = {
- val iter = binaryMap.safeIterator()
+ val iter = binaryMap.iterator()
new Iterator[InternalRow] {
val unsafeRow = new UnsafeRow(numKeys)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org