You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/16 06:18:23 UTC
[spark] branch master updated: [SPARK-38542][SQL] UnsafeHashedRelation should serialize numKeys out
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8476c8b [SPARK-38542][SQL] UnsafeHashedRelation should serialize numKeys out
8476c8b is described below
commit 8476c8b846ffd2622a6bcf1accf9fa55ffbdc0db
Author: mcdull-zhang <wo...@163.com>
AuthorDate: Wed Mar 16 14:17:18 2022 +0800
[SPARK-38542][SQL] UnsafeHashedRelation should serialize numKeys out
### What changes were proposed in this pull request?
UnsafeHashedRelation should serialize numKeys out
### Why are the changes needed?
One case I found was this:
We turned on ReusedExchange(BroadcastExchange), but the returned UnsafeHashedRelation is missing numKeys.
The reason is that the current type of TorrentBroadcast._value is SoftReference, so the UnsafeHashedRelation obtained by deserialization loses numKeys, which will lead to incorrect calculation results.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a line of assert to an existing unit test
Closes #35836 from mcdull-zhang/UnsafeHashed.
Authored-by: mcdull-zhang <wo...@163.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../scala/org/apache/spark/sql/execution/joins/HashedRelation.scala | 4 +++-
.../org/apache/spark/sql/execution/joins/HashedRelationSuite.scala | 3 +++
2 files changed, 6 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 698e7ed..253f16e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -207,7 +207,7 @@ private[execution] class ValueRowWithKeyIndex {
* A HashedRelation for UnsafeRow, which is backed BytesToBytesMap.
*
* It's serialized in the following format:
- * [number of keys]
+ * [number of keys] [number of fields]
* [size of key] [size of value] [key bytes] [bytes for value]
*/
private[joins] class UnsafeHashedRelation(
@@ -364,6 +364,7 @@ private[joins] class UnsafeHashedRelation(
writeInt: (Int) => Unit,
writeLong: (Long) => Unit,
writeBuffer: (Array[Byte], Int, Int) => Unit) : Unit = {
+ writeInt(numKeys)
writeInt(numFields)
// TODO: move these into BytesToBytesMap
writeLong(binaryMap.numKeys())
@@ -397,6 +398,7 @@ private[joins] class UnsafeHashedRelation(
readInt: () => Int,
readLong: () => Long,
readBuffer: (Array[Byte], Int, Int) => Unit): Unit = {
+ numKeys = readInt()
numFields = readInt()
resultRow = new UnsafeRow(numFields)
val nKeys = readLong()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 2462fe3..6c87178 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -93,6 +93,9 @@ class HashedRelationSuite extends SharedSparkSession {
assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
assert(hashed2.get(unsafeData(2)).toArray === data2)
+ // SPARK-38542: UnsafeHashedRelation should serialize numKeys out
+ assert(hashed2.keys().map(_.copy()).forall(_.numFields == 1))
+
val os2 = new ByteArrayOutputStream()
val out2 = new ObjectOutputStream(os2)
hashed2.writeExternal(out2)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org