You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by marmbrus <gi...@git.apache.org> on 2014/05/11 20:52:01 UTC

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

GitHub user marmbrus opened a pull request:

    https://github.com/apache/spark/pull/734

    [SQL] SPARK-1800 Add broadcast hash join operator

    WIP: A few things remain, but looking for feedback on this approach.
    
     - [ ] Figure out how to configure this.  The immutability of SparkConf is probably not great for things like query hints.
     - [ ] Figure out how to test this.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marmbrus/spark broadcastHashJoin

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/734.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #734
    
----
commit a8420ca0c4cbc5988607d0cd235ffeb2cb51d052
Author: Michael Armbrust <mi...@databricks.com>
Date:   2014-05-11T18:23:02Z

    Copy records in executeCollect to avoid issues with mutable rows.

commit cf6b3818fbe7d1908bcbdc7f18c5773c01d05541
Author: Michael Armbrust <mi...@databricks.com>
Date:   2014-05-11T18:30:56Z

    Split out generic logic for hash joins and create two concrete physical operators: BroadcastHashJoin and ShuffledHashJoin.

commit 76ca4341036b95f71763f631049fdae033990ab5
Author: Michael Armbrust <mi...@databricks.com>
Date:   2014-05-11T18:31:20Z

    A simple strategy that broadcasts tables only when they are found in a configuration hint.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13941218
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    --- End diff --
    
    Yep, we should update the comment "When the operator is constructed" then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by xiaocai00 <gi...@git.apache.org>.
Github user xiaocai00 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13024562
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    + * is then placed in a Spark broadcast variable.  The streamed relation is not shuffled.
    + */
    +@DeveloperApi
    +case class BroadcastHashJoin(
    +     leftKeys: Seq[Expression],
    +     rightKeys: Seq[Expression],
    +     buildSide: BuildSide,
    +     left: SparkPlan,
    +     right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode with HashJoin {
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
    +
    +  @transient
    +  lazy val broadcastFuture = future {
    +   sc.broadcast(buildPlan.executeCollect())
    --- End diff --
    
    Good to know. Thanks for the headsup


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by xiaocai00 <gi...@git.apache.org>.
Github user xiaocai00 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r12868794
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    + * is then placed in a Spark broadcast variable.  The streamed relation is not shuffled.
    + */
    +@DeveloperApi
    +case class BroadcastHashJoin(
    +     leftKeys: Seq[Expression],
    +     rightKeys: Seq[Expression],
    +     buildSide: BuildSide,
    +     left: SparkPlan,
    +     right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode with HashJoin {
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
    +
    +  @transient
    +  lazy val broadcastFuture = future {
    +   sc.broadcast(buildPlan.executeCollect())
    --- End diff --
    
    Hi, will you plan to clean up broadcast variables after the operation or leave it in the context?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by xiaocai00 <gi...@git.apache.org>.
Github user xiaocai00 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13019713
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    + * is then placed in a Spark broadcast variable.  The streamed relation is not shuffled.
    + */
    +@DeveloperApi
    +case class BroadcastHashJoin(
    +     leftKeys: Seq[Expression],
    +     rightKeys: Seq[Expression],
    +     buildSide: BuildSide,
    +     left: SparkPlan,
    +     right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode with HashJoin {
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
    +
    +  @transient
    +  lazy val broadcastFuture = future {
    +   sc.broadcast(buildPlan.executeCollect())
    --- End diff --
    
    Hi Reynold, thanks for the reply. Does spark has a plan to port this PR in to the repo?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-45312913
  
    Should this go in now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-42779419
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-42779420
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14890/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13941073
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    --- End diff --
    
    The bodies of `BroadcastHashJoin` and of `HashJoin` do not strictly reference `broadcastFuture`, right? If so, the Spark job isn't launched during the constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-42895257
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-46764177
  
    Closing in favor of: #1163


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-42779387
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13019926
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    + * is then placed in a Spark broadcast variable.  The streamed relation is not shuffled.
    + */
    +@DeveloperApi
    +case class BroadcastHashJoin(
    +     leftKeys: Seq[Expression],
    +     rightKeys: Seq[Expression],
    +     buildSide: BuildSide,
    +     left: SparkPlan,
    +     right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode with HashJoin {
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
    +
    +  @transient
    +  lazy val broadcastFuture = future {
    +   sc.broadcast(buildPlan.executeCollect())
    --- End diff --
    
    We definitely want to merge this PR (assuming you are talking about the broadcast hash join PR).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by xiaocai00 <gi...@git.apache.org>.
Github user xiaocai00 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13022952
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    + * is then placed in a Spark broadcast variable.  The streamed relation is not shuffled.
    + */
    +@DeveloperApi
    +case class BroadcastHashJoin(
    +     leftKeys: Seq[Expression],
    +     rightKeys: Seq[Expression],
    +     buildSide: BuildSide,
    +     left: SparkPlan,
    +     right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode with HashJoin {
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
    +
    +  @transient
    +  lazy val broadcastFuture = future {
    +   sc.broadcast(buildPlan.executeCollect())
    --- End diff --
    
    Yep, the broadcast join. We were experiencing the perf problem when join between a big table with a small table. Look forward to the merge. Do you know when it will approximately be, assuming it goes to 1.1.0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13941129
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    --- End diff --
    
    It is only run on Line 191 during execute.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-42901213
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14907/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-42895233
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-42779382
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-45357748
  
    We should at least add some tests before merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13941730
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    --- End diff --
    
    Yeah i guess it should be when the RDD is constructed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r12927830
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    + * is then placed in a Spark broadcast variable.  The streamed relation is not shuffled.
    + */
    +@DeveloperApi
    +case class BroadcastHashJoin(
    +     leftKeys: Seq[Expression],
    +     rightKeys: Seq[Expression],
    +     buildSide: BuildSide,
    +     left: SparkPlan,
    +     right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode with HashJoin {
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
    +
    +  @transient
    +  lazy val broadcastFuture = future {
    +   sc.broadcast(buildPlan.executeCollect())
    --- End diff --
    
    In Spark 1.0, with the newly added garbage collection mechanism, when the query plan itself goes out of scope, the broadcast variable should also be cleaned automatically.
    
    Another way we can do this is to have some query context object we pass around the entire physical query plan which tracks the stuff we need to clean up. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13022986
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed, a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.  This data
    + * is then placed in a Spark broadcast variable.  The streamed relation is not shuffled.
    + */
    +@DeveloperApi
    +case class BroadcastHashJoin(
    +     leftKeys: Seq[Expression],
    +     rightKeys: Seq[Expression],
    +     buildSide: BuildSide,
    +     left: SparkPlan,
    +     right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode with HashJoin {
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
    +
    +  @transient
    +  lazy val broadcastFuture = future {
    +   sc.broadcast(buildPlan.executeCollect())
    --- End diff --
    
    1.0 is already going through voting now so this won't make it into 1.0. It will be in 1.0.1/1.1; However, if you need this functionality, you can just cherry pick this pull request and do a custom build. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/734#issuecomment-42901210
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus closed the pull request at:

    https://github.com/apache/spark/pull/734


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---