You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/07/08 19:43:06 UTC
spark git commit: [SPARK-5707] [SQL] fix serialization of generated
projection
Repository: spark
Updated Branches:
refs/heads/master 3e831a269 -> 74335b310
[SPARK-5707] [SQL] fix serialization of generated projection
Author: Davies Liu <da...@databricks.com>
Closes #7272 from davies/fix_projection and squashes the following commits:
075ef76 [Davies Liu] fix codegen with BroadcastHashJion
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74335b31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74335b31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74335b31
Branch: refs/heads/master
Commit: 74335b31072951244967f878d8b766cd1bfc2ac6
Parents: 3e831a2
Author: Davies Liu <da...@databricks.com>
Authored: Wed Jul 8 10:43:00 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Jul 8 10:43:00 2015 -0700
----------------------------------------------------------------------
.../apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala | 3 +--
.../org/apache/spark/sql/execution/joins/HashOuterJoin.scala | 2 +-
.../org/apache/spark/sql/execution/joins/HashedRelation.scala | 2 +-
3 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index 06c244f..ab757fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -79,8 +79,7 @@ case class BroadcastHashOuterJoin(
// Note that we use .execute().collect() because we don't want to convert data to Scala types
val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
// buildHashTable uses code-generated rows as keys, which are not serializable
- val hashed =
- buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output))
+ val hashed = buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output))
sparkContext.broadcast(hashed)
}(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 3337451..0522ee8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -171,7 +171,7 @@ override def outputPartitioning: Partitioning = joinType match {
var existingMatchList = hashTable.get(rowKey)
if (existingMatchList == null) {
existingMatchList = new CompactBuffer[InternalRow]()
- hashTable.put(rowKey, existingMatchList)
+ hashTable.put(rowKey.copy(), existingMatchList)
}
existingMatchList += currentRow.copy()
http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index de062c7..6b51f5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -125,7 +125,7 @@ private[joins] object HashedRelation {
val existingMatchList = hashTable.get(rowKey)
val matchList = if (existingMatchList == null) {
val newMatchList = new CompactBuffer[InternalRow]()
- hashTable.put(rowKey, newMatchList)
+ hashTable.put(rowKey.copy(), newMatchList)
newMatchList
} else {
keyIsUnique = false
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org