You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/07/08 19:43:06 UTC

spark git commit: [SPARK-5707] [SQL] fix serialization of generated projection

Repository: spark
Updated Branches:
  refs/heads/master 3e831a269 -> 74335b310


[SPARK-5707] [SQL] fix serialization of generated projection

Author: Davies Liu <da...@databricks.com>

Closes #7272 from davies/fix_projection and squashes the following commits:

075ef76 [Davies Liu] fix codegen with BroadcastHashJion


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74335b31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74335b31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74335b31

Branch: refs/heads/master
Commit: 74335b31072951244967f878d8b766cd1bfc2ac6
Parents: 3e831a2
Author: Davies Liu <da...@databricks.com>
Authored: Wed Jul 8 10:43:00 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Jul 8 10:43:00 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala | 3 +--
 .../org/apache/spark/sql/execution/joins/HashOuterJoin.scala      | 2 +-
 .../org/apache/spark/sql/execution/joins/HashedRelation.scala     | 2 +-
 3 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index 06c244f..ab757fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -79,8 +79,7 @@ case class BroadcastHashOuterJoin(
     // Note that we use .execute().collect() because we don't want to convert data to Scala types
     val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
     // buildHashTable uses code-generated rows as keys, which are not serializable
-    val hashed =
-      buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output))
+    val hashed = buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output))
     sparkContext.broadcast(hashed)
   }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 3337451..0522ee8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -171,7 +171,7 @@ override def outputPartitioning: Partitioning = joinType match {
       var existingMatchList = hashTable.get(rowKey)
       if (existingMatchList == null) {
         existingMatchList = new CompactBuffer[InternalRow]()
-        hashTable.put(rowKey, existingMatchList)
+        hashTable.put(rowKey.copy(), existingMatchList)
       }
 
       existingMatchList += currentRow.copy()

http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index de062c7..6b51f5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -125,7 +125,7 @@ private[joins] object HashedRelation {
         val existingMatchList = hashTable.get(rowKey)
         val matchList = if (existingMatchList == null) {
           val newMatchList = new CompactBuffer[InternalRow]()
-          hashTable.put(rowKey, newMatchList)
+          hashTable.put(rowKey.copy(), newMatchList)
           newMatchList
         } else {
           keyIsUnique = false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org