You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2015/09/07 15:10:49 UTC

[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

GitHub user zsxwing opened a pull request:

    https://github.com/apache/spark/pull/8642

    [SPARK-9996][SPARK-9997][SQL]Add local expand and NestedLoopJoin operators

    This PR is in conflict with #8535 and #8573. Will update this one when they are merged.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zsxwing/spark expand-nest-join

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/8642.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #8642
    
----
commit 5cb1a610b21e84d6a1444fb3d98d2d071f61c4b6
Author: zsxwing <zs...@gmail.com>
Date:   2015-08-31T07:43:55Z

    Add SQLConf to LocalNode

commit 9055d8afcc114734781006f019603e89c65661bc
Author: zsxwing <zs...@gmail.com>
Date:   2015-09-07T12:06:04Z

    Add local expand and NestedLoopJoin operators

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39321800
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala ---
    @@ -0,0 +1,225 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, RightOuter}
    +import org.apache.spark.sql.execution.joins.BuildRight
    +
    +class NestedLoopJoinNodeSuite extends LocalNodeTest {
    +
    +  import testImplicits._
    +
    +  def joinSuite(suiteName: String, confPairs: (String, String)*): Unit = {
    --- End diff --
    
    private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39322010
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala ---
    @@ -69,6 +69,23 @@ abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
        */
       def close(): Unit
     
    +  /** Specifies whether this operator outputs UnsafeRows */
    +  def outputsUnsafeRows: Boolean = false
    +
    +  /** Specifies whether this operator is capable of processing UnsafeRows */
    +  def canProcessUnsafeRows: Boolean = false
    +
    +  /**
    +   * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
    +   * that are not UnsafeRows).
    +   */
    +  def canProcessSafeRows: Boolean = true
    --- End diff --
    
    how many of these can be protected?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139533219
  
      [Test build #42330 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42330/console) for   PR 8642 at commit [`514d3d2`](https://github.com/apache/spark/commit/514d3d206af3d8e0f967746c68fc165ea77bb03c).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ExpandNode(`
      * `case class NestedLoopJoinNode(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39311087
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala ---
    @@ -0,0 +1,63 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Projection}
    +
    +case class ExpandNode(
    +    conf: SQLConf,
    +    projections: Seq[Seq[Expression]],
    +    output: Seq[Attribute],
    +    child: LocalNode) extends UnaryLocalNode(conf) {
    +
    +  assert(projections.size > 0)
    +
    +  private[this] var result: InternalRow = _
    +  private[this] var idx: Int = _
    +  private[this] var input: InternalRow = _
    +  private[this] var groups: Array[Projection] = _
    +
    +  override def open(): Unit = {
    +    child.open()
    +    idx = -1
    +    groups = projections.map(ee => newProjection(ee, child.output)).toArray
    +  }
    +
    +  override def next(): Boolean = {
    +    if (idx < 0 || idx >= groups.length) {
    +      if (child.next()) {
    +        input = child.fetch()
    +        result = groups(0)(input)
    +        idx = 1
    +        true
    +      } else {
    +        false
    +      }
    +    } else {
    +      result = groups(idx)(input)
    +      idx += 1
    +      true
    +    }
    --- End diff --
    
    how about:
    ```
    if (idx < 0 || idx >= groups.length) {
      if (child.next()) {
        input = child.fetch()
        idx = 0
      } else {
        return false
      }
    }
    result = groups(idx)(input)
    idx += 1
    true
    ```
    a little less duplication


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139510766
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39320829
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    --- End diff --
    
    also can you add a return type here? It helps readability a little bit since this flat map block is kind of big:
    ```
    val streamedRowMatches: Iterator[InternalRow] = ...
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-138323420
  
      [Test build #42095 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42095/console) for   PR 8642 at commit [`9055d8a`](https://github.com/apache/spark/commit/9055d8afcc114734781006f019603e89c65661bc).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) `
      * `case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) `
      * `case class ExpandNode(`
      * `case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)`
      * `case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf) `
      * `abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging `
      * `abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf) `
      * `abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf) `
      * `abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf) `
      * `case class NestedLoopJoinNode(`
      * `case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)`
      * `case class SeqScanNode(conf: SQLConf, output: Seq[Attribute], data: Seq[InternalRow])`
      * `case class UnionNode(conf: SQLConf, children: Seq[LocalNode]) extends LocalNode(conf) `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39316914
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    --- End diff --
    
    nit you can merge this with the one above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-140071326
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39235492
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala ---
    @@ -0,0 +1,62 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Projection}
    +
    +case class ExpandNode(
    +    conf: SQLConf,
    +    projections: Seq[Seq[Expression]],
    +    output: Seq[Attribute],
    +    child: LocalNode) extends UnaryLocalNode(conf) {
    +
    +  assert(projections.size > 0)
    +
    +  private[this] var result: InternalRow = _
    +  private[this] var idx: Int = _
    +  private[this] var input: InternalRow = _
    +
    +  private[this] var groups: Array[Projection] = _
    +
    +  override def open(): Unit = {
    +    child.open()
    +    idx = -1
    +    groups = projections.map(ee => newProjection(ee, child.output)).toArray
    +  }
    +
    +  override def next(): Boolean = {
    +    idx += 1
    +    if (idx < groups.length) {
    +      result = groups(idx)(input)
    --- End diff --
    
    hm, at this point `input` hasn't been initialized yet has it? Did you intend to pass `null` into the projection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39321471
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    +        val buildRow = buildRelation(i)
    +        buildSide match {
    +          case BuildRight if boundCondition(joinedRow(streamedRow, buildRow)) =>
    +            matchedRows += resultProj(joinedRow(streamedRow, buildRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case BuildLeft if boundCondition(joinedRow(buildRow, streamedRow)) =>
    +            matchedRows += resultProj(joinedRow(buildRow, streamedRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case _ =>
    +        }
    +        i += 1
    +      }
    +
    +      (streamRowMatched, joinType, buildSide) match {
    +        case (false, LeftOuter | FullOuter, BuildRight) =>
    +          matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
    +        case (false, RightOuter | FullOuter, BuildLeft) =>
    +          matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
    +        case _ =>
    +      }
    +
    +      matchedRows.iterator
    +    }
    +
    +    iterator = (joinType, buildSide) match {
    +      case (RightOuter | FullOuter, BuildRight) =>
    +        var i = 0
    +        matchesOrStreamedRowsWithNulls ++ buildRelation.filter { row =>
    +          val r = !includedBuildTuples.get(i)
    +          i += 1
    +          r
    +        }.iterator.map { buildRow =>
    +          joinedRow.withLeft(leftNulls)
    +          resultProj(joinedRow.withRight(buildRow))
    +        }
    +      case (LeftOuter | FullOuter, BuildLeft) =>
    +        var i = 0
    +        matchesOrStreamedRowsWithNulls ++ buildRelation.filter { row =>
    +          val r = !includedBuildTuples.get(i)
    +          i += 1
    +          r
    +        }.iterator.map { buildRow =>
    +          joinedRow.withRight(rightNulls)
    +          resultProj(joinedRow.withLeft(buildRow))
    +        }
    +      case _ => matchesOrStreamedRowsWithNulls
    +    }
    --- End diff --
    
    Reduce duplicate code:
    ```
    // If we're using outer join, find rows on the build side that didn't match anything
    // and join them with the null row
    lazy val unmatchedBuildRows: Iterator[InternalRow] = {
      var i = 0
      buildRelation.filter { row =>
        val r = !includedBuildTuples.get(i)
        i += 1
        r
      }.iterator
    }
    
    val additionalRows: Iterator[InternalRow] = (joinType, buildSide) match {
      case (RightOuter | FullOuter, BuildRight) =>
        unmatchedBuildRows.map { resultProj(joinedRow(leftNulls, _)) }
      case (LeftOuter | FullOuter, BuildLeft) =>
        unmatchedBuildRows.map { resultProj(joinedRow(_, rightNulls)) }
      case _ =>
        Iterator.empty[InternalRow]
    }
    
    iterator = streamedRowMatches ++ additionalRows
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139581381
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39319838
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    +        val buildRow = buildRelation(i)
    +        buildSide match {
    +          case BuildRight if boundCondition(joinedRow(streamedRow, buildRow)) =>
    +            matchedRows += resultProj(joinedRow(streamedRow, buildRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case BuildLeft if boundCondition(joinedRow(buildRow, streamedRow)) =>
    +            matchedRows += resultProj(joinedRow(buildRow, streamedRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case _ =>
    +        }
    +        i += 1
    +      }
    +
    +      (streamRowMatched, joinType, buildSide) match {
    +        case (false, LeftOuter | FullOuter, BuildRight) =>
    +          matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
    +        case (false, RightOuter | FullOuter, BuildLeft) =>
    +          matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
    +        case _ =>
    +      }
    +
    +      matchedRows.iterator
    +    }
    +
    +    iterator = (joinType, buildSide) match {
    --- End diff --
    
    ```
    // If we're using outer join, find rows on the build side that didn't match anything
    // and join them with the null row
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-140040686
  
    @andrewor14 thanks for your reviewing. I updated this PR as per your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-138323541
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39315088
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala ---
    @@ -108,6 +146,24 @@ abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
         }
       }
     
    +  protected def newPredicate(
    +      expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
    --- End diff --
    
    style: can you put `inputSchema` on new line, same in L107


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-140040052
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-138294215
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39319148
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    --- End diff --
    
    I would just call this `streamRowMatches` and add a comment to explain it could contain null rows if we're using outer join


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-140039993
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-140217189
  
    LGTM, thanks I'm merging this into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-140041316
  
      [Test build #42421 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42421/consoleFull) for   PR 8642 at commit [`c6e80a2`](https://github.com/apache/spark/commit/c6e80a2f0fa71dc788a754dd9d0f7e8e89bab56f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39320498
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    +        val buildRow = buildRelation(i)
    +        buildSide match {
    +          case BuildRight if boundCondition(joinedRow(streamedRow, buildRow)) =>
    +            matchedRows += resultProj(joinedRow(streamedRow, buildRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case BuildLeft if boundCondition(joinedRow(buildRow, streamedRow)) =>
    +            matchedRows += resultProj(joinedRow(buildRow, streamedRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case _ =>
    +        }
    +        i += 1
    +      }
    +
    +      (streamRowMatched, joinType, buildSide) match {
    +        case (false, LeftOuter | FullOuter, BuildRight) =>
    +          matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
    +        case (false, RightOuter | FullOuter, BuildLeft) =>
    +          matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
    +        case _ =>
    +      }
    +
    +      matchedRows.iterator
    +    }
    +
    +    iterator = (joinType, buildSide) match {
    +      case (RightOuter | FullOuter, BuildRight) =>
    +        var i = 0
    +        matchesOrStreamedRowsWithNulls ++ buildRelation.filter { row =>
    +          val r = !includedBuildTuples.get(i)
    +          i += 1
    +          r
    +        }.iterator.map { buildRow =>
    +          joinedRow.withLeft(leftNulls)
    +          resultProj(joinedRow.withRight(buildRow))
    --- End diff --
    
    could just be `resultProj(joinedRow(leftNulls, buildRow))`, same in L136


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139550058
  
      [Test build #42336 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42336/consoleFull) for   PR 8642 at commit [`4b18418`](https://github.com/apache/spark/commit/4b18418a06d1a34dfb1a0193545b4fd3fb3d5e8d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139510723
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139548092
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139511706
  
      [Test build #42330 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42330/consoleFull) for   PR 8642 at commit [`514d3d2`](https://github.com/apache/spark/commit/514d3d206af3d8e0f967746c68fc165ea77bb03c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39318962
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    +        val buildRow = buildRelation(i)
    +        buildSide match {
    +          case BuildRight if boundCondition(joinedRow(streamedRow, buildRow)) =>
    +            matchedRows += resultProj(joinedRow(streamedRow, buildRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case BuildLeft if boundCondition(joinedRow(buildRow, streamedRow)) =>
    +            matchedRows += resultProj(joinedRow(buildRow, streamedRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case _ =>
    +        }
    +        i += 1
    +      }
    +
    +      (streamRowMatched, joinType, buildSide) match {
    +        case (false, LeftOuter | FullOuter, BuildRight) =>
    +          matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
    +        case (false, RightOuter | FullOuter, BuildLeft) =>
    +          matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
    +        case _ =>
    +      }
    --- End diff --
    
    it would be clearer if this looks like:
    ```
    // If this row had no matches and we're using outer join, join it with the null row
    if (!streamRowMatched) {
      (joinType, buildSide) match {
        ...
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39319530
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    +        val buildRow = buildRelation(i)
    +        buildSide match {
    +          case BuildRight if boundCondition(joinedRow(streamedRow, buildRow)) =>
    +            matchedRows += resultProj(joinedRow(streamedRow, buildRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case BuildLeft if boundCondition(joinedRow(buildRow, streamedRow)) =>
    +            matchedRows += resultProj(joinedRow(buildRow, streamedRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case _ =>
    +        }
    --- End diff --
    
    Reduce duplicate code:
    ```
    buildSide match {
      case BuildRight => joinedRow(streamedRow, buildRow)
      case BuildLeft => joinedRow(buildRow, streamedRow)
      case _ =>
    }
    if (boundCondition(joinedRow)) {
      matchedRows += resultProj(joinedRow).copy()
      streamedRowMatched = true
      joinedBuildTuples.set(i)
    }
    ```
    also `joinedRow(x, y)` mutates the row itself, so you can just use `joinedRow` directly when doing the projection


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39452893
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    +        val buildRow = buildRelation(i)
    +        buildSide match {
    +          case BuildRight if boundCondition(joinedRow(streamedRow, buildRow)) =>
    +            matchedRows += resultProj(joinedRow(streamedRow, buildRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case BuildLeft if boundCondition(joinedRow(buildRow, streamedRow)) =>
    +            matchedRows += resultProj(joinedRow(buildRow, streamedRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case _ =>
    +        }
    +        i += 1
    +      }
    +
    +      (streamRowMatched, joinType, buildSide) match {
    +        case (false, LeftOuter | FullOuter, BuildRight) =>
    +          matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
    +        case (false, RightOuter | FullOuter, BuildLeft) =>
    +          matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
    +        case _ =>
    +      }
    +
    +      matchedRows.iterator
    +    }
    +
    +    iterator = (joinType, buildSide) match {
    +      case (RightOuter | FullOuter, BuildRight) =>
    +        var i = 0
    +        matchesOrStreamedRowsWithNulls ++ buildRelation.filter { row =>
    +          val r = !includedBuildTuples.get(i)
    +          i += 1
    +          r
    +        }.iterator.map { buildRow =>
    +          joinedRow.withLeft(leftNulls)
    +          resultProj(joinedRow.withRight(buildRow))
    +        }
    +      case (LeftOuter | FullOuter, BuildLeft) =>
    +        var i = 0
    +        matchesOrStreamedRowsWithNulls ++ buildRelation.filter { row =>
    +          val r = !includedBuildTuples.get(i)
    +          i += 1
    +          r
    +        }.iterator.map { buildRow =>
    +          joinedRow.withRight(rightNulls)
    +          resultProj(joinedRow.withLeft(buildRow))
    +        }
    +      case _ => matchesOrStreamedRowsWithNulls
    +    }
    --- End diff --
    
    ok that's fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-138323543
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42095/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139548116
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/8642


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139581383
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42336/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39381843
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    +        val buildRow = buildRelation(i)
    +        buildSide match {
    +          case BuildRight if boundCondition(joinedRow(streamedRow, buildRow)) =>
    +            matchedRows += resultProj(joinedRow(streamedRow, buildRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case BuildLeft if boundCondition(joinedRow(buildRow, streamedRow)) =>
    +            matchedRows += resultProj(joinedRow(buildRow, streamedRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case _ =>
    +        }
    +        i += 1
    +      }
    +
    +      (streamRowMatched, joinType, buildSide) match {
    +        case (false, LeftOuter | FullOuter, BuildRight) =>
    +          matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
    +        case (false, RightOuter | FullOuter, BuildLeft) =>
    +          matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
    +        case _ =>
    +      }
    +
    +      matchedRows.iterator
    +    }
    +
    +    iterator = (joinType, buildSide) match {
    +      case (RightOuter | FullOuter, BuildRight) =>
    +        var i = 0
    +        matchesOrStreamedRowsWithNulls ++ buildRelation.filter { row =>
    +          val r = !includedBuildTuples.get(i)
    +          i += 1
    +          r
    +        }.iterator.map { buildRow =>
    +          joinedRow.withLeft(leftNulls)
    +          resultProj(joinedRow.withRight(buildRow))
    --- End diff --
    
    > also, don't we need to do a copy here?
    
    It's not necessary since we don't need to cache `joinedRow` here. If the user needs to use multiple rows of this Iterator, he should copy it by himself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-140071327
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42421/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-138294238
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39320538
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    +        val buildRow = buildRelation(i)
    +        buildSide match {
    +          case BuildRight if boundCondition(joinedRow(streamedRow, buildRow)) =>
    +            matchedRows += resultProj(joinedRow(streamedRow, buildRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case BuildLeft if boundCondition(joinedRow(buildRow, streamedRow)) =>
    +            matchedRows += resultProj(joinedRow(buildRow, streamedRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case _ =>
    +        }
    +        i += 1
    +      }
    +
    +      (streamRowMatched, joinType, buildSide) match {
    +        case (false, LeftOuter | FullOuter, BuildRight) =>
    +          matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
    +        case (false, RightOuter | FullOuter, BuildLeft) =>
    +          matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
    +        case _ =>
    +      }
    +
    +      matchedRows.iterator
    +    }
    +
    +    iterator = (joinType, buildSide) match {
    +      case (RightOuter | FullOuter, BuildRight) =>
    +        var i = 0
    +        matchesOrStreamedRowsWithNulls ++ buildRelation.filter { row =>
    +          val r = !includedBuildTuples.get(i)
    +          i += 1
    +          r
    +        }.iterator.map { buildRow =>
    +          joinedRow.withLeft(leftNulls)
    +          resultProj(joinedRow.withRight(buildRow))
    --- End diff --
    
    also, don't we need to do a copy here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39381973
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala ---
    @@ -69,6 +69,23 @@ abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
        */
       def close(): Unit
     
    +  /** Specifies whether this operator outputs UnsafeRows */
    +  def outputsUnsafeRows: Boolean = false
    +
    +  /** Specifies whether this operator is capable of processing UnsafeRows */
    +  def canProcessUnsafeRows: Boolean = false
    +
    +  /**
    +   * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
    +   * that are not UnsafeRows).
    +   */
    +  def canProcessSafeRows: Boolean = true
    --- End diff --
    
    These methods will be used out of LocalNode, such as building a LocalNode tree. So I didn't use `protected` for them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-140071206
  
      [Test build #42421 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42421/console) for   PR 8642 at commit [`c6e80a2`](https://github.com/apache/spark/commit/c6e80a2f0fa71dc788a754dd9d0f7e8e89bab56f).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ExpandNode(`
      * `case class NestedLoopJoinNode(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39321640
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    --- End diff --
    
    ```
    // Scan the build relation for each streamed row to look for matches
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139670309
  
    @zsxwing I left some suggestions on reducing code duplication, but otherwise this looks pretty good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39381902
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    +    val resultProj = genResultProjection
    +    streamed.open()
    +
    +    val matchesOrStreamedRowsWithNulls = streamed.asIterator.flatMap { streamedRow =>
    +      val matchedRows = new CompactBuffer[InternalRow]
    +
    +      var i = 0
    +      var streamRowMatched = false
    +
    +      while (i < buildRelation.size) {
    +        val buildRow = buildRelation(i)
    +        buildSide match {
    +          case BuildRight if boundCondition(joinedRow(streamedRow, buildRow)) =>
    +            matchedRows += resultProj(joinedRow(streamedRow, buildRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case BuildLeft if boundCondition(joinedRow(buildRow, streamedRow)) =>
    +            matchedRows += resultProj(joinedRow(buildRow, streamedRow)).copy()
    +            streamRowMatched = true
    +            includedBuildTuples.set(i)
    +          case _ =>
    +        }
    +        i += 1
    +      }
    +
    +      (streamRowMatched, joinType, buildSide) match {
    +        case (false, LeftOuter | FullOuter, BuildRight) =>
    +          matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
    +        case (false, RightOuter | FullOuter, BuildLeft) =>
    +          matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
    +        case _ =>
    +      }
    +
    +      matchedRows.iterator
    +    }
    +
    +    iterator = (joinType, buildSide) match {
    +      case (RightOuter | FullOuter, BuildRight) =>
    +        var i = 0
    +        matchesOrStreamedRowsWithNulls ++ buildRelation.filter { row =>
    +          val r = !includedBuildTuples.get(i)
    +          i += 1
    +          r
    +        }.iterator.map { buildRow =>
    +          joinedRow.withLeft(leftNulls)
    +          resultProj(joinedRow.withRight(buildRow))
    +        }
    +      case (LeftOuter | FullOuter, BuildLeft) =>
    +        var i = 0
    +        matchesOrStreamedRowsWithNulls ++ buildRelation.filter { row =>
    +          val r = !includedBuildTuples.get(i)
    +          i += 1
    +          r
    +        }.iterator.map { buildRow =>
    +          joinedRow.withRight(rightNulls)
    +          resultProj(joinedRow.withLeft(buildRow))
    +        }
    +      case _ => matchesOrStreamedRowsWithNulls
    +    }
    --- End diff --
    
    I did a little change to your suggest to avoid using `Iterator.empty[InternalRow]`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39321952
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala ---
    @@ -0,0 +1,225 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, RightOuter}
    +import org.apache.spark.sql.execution.joins.BuildRight
    +
    +class NestedLoopJoinNodeSuite extends LocalNodeTest {
    +
    +  import testImplicits._
    +
    +  def joinSuite(suiteName: String, confPairs: (String, String)*): Unit = {
    +    test(s"$suiteName: left outer join") {
    +      withSQLConf(confPairs: _*) {
    +        checkAnswer2(
    +          upperCaseData,
    +          lowerCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              LeftOuter,
    +              Some((upperCaseData.col("N") === lowerCaseData.col("n")).expr))
    +          ),
    +          upperCaseData.join(lowerCaseData, $"n" === $"N", "left").collect())
    +
    +        checkAnswer2(
    +          upperCaseData,
    +          lowerCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              LeftOuter,
    +              Some(
    +                (upperCaseData.col("N") === lowerCaseData.col("n") &&
    +                  lowerCaseData.col("n") > 1).expr))
    +          ),
    +          upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, "left").collect())
    +
    +        checkAnswer2(
    +          upperCaseData,
    +          lowerCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              LeftOuter,
    +              Some(
    +                (upperCaseData.col("N") === lowerCaseData.col("n") &&
    +                  upperCaseData.col("N") > 1).expr))
    +          ),
    +          upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, "left").collect())
    +
    +        checkAnswer2(
    +          upperCaseData,
    +          lowerCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              LeftOuter,
    +              Some(
    +                (upperCaseData.col("N") === lowerCaseData.col("n") &&
    +                  lowerCaseData.col("l") > upperCaseData.col("L")).expr))
    +          ),
    +          upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", "left").collect())
    +      }
    +    }
    +
    +    test(s"$suiteName: right outer join") {
    +      withSQLConf(confPairs: _*) {
    +        checkAnswer2(
    +          lowerCaseData,
    +          upperCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              RightOuter,
    +              Some((lowerCaseData.col("n") === upperCaseData.col("N")).expr))
    +          ),
    +          lowerCaseData.join(upperCaseData, $"n" === $"N", "right").collect())
    +
    +        checkAnswer2(
    +          lowerCaseData,
    +          upperCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              RightOuter,
    +              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
    +                lowerCaseData.col("n") > 1).expr))
    +          ),
    +          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "right").collect())
    +
    +        checkAnswer2(
    +          lowerCaseData,
    +          upperCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              RightOuter,
    +              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
    +                upperCaseData.col("N") > 1).expr))
    +          ),
    +          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "right").collect())
    +
    +        checkAnswer2(
    +          lowerCaseData,
    +          upperCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              RightOuter,
    +              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
    +                lowerCaseData.col("l") > upperCaseData.col("L")).expr))
    +          ),
    +          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", "right").collect())
    +      }
    +    }
    +
    +    test(s"$suiteName: full outer join") {
    +      withSQLConf(confPairs: _*) {
    +        checkAnswer2(
    +          lowerCaseData,
    +          upperCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              FullOuter,
    +              Some((lowerCaseData.col("n") === upperCaseData.col("N")).expr))
    +          ),
    +          lowerCaseData.join(upperCaseData, $"n" === $"N", "full").collect())
    +
    +        checkAnswer2(
    +          lowerCaseData,
    +          upperCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              FullOuter,
    +              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
    +                lowerCaseData.col("n") > 1).expr))
    +          ),
    +          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "full").collect())
    +
    +        checkAnswer2(
    +          lowerCaseData,
    +          upperCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    +              FullOuter,
    +              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
    +                upperCaseData.col("N") > 1).expr))
    +          ),
    +          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "full").collect())
    +
    +        checkAnswer2(
    +          lowerCaseData,
    +          upperCaseData,
    +          wrapForUnsafe(
    +            (node1, node2) => NestedLoopJoinNode(
    +              conf,
    +              node1,
    +              node2,
    +              BuildRight,
    --- End diff --
    
    looks like all these tests only test `BuildRight`. Shall we have some tests for `BuildLeft` as well? Might be as simple as adding `buildSide` as a param to `joinSuite`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8642#discussion_r39319569
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.local
    +
    +import org.apache.spark.sql.SQLConf
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
    +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
    +import org.apache.spark.util.collection.BitSet
    +import org.apache.spark.util.collection.CompactBuffer
    +
    +case class NestedLoopJoinNode(
    +    conf: SQLConf,
    +    left: LocalNode,
    +    right: LocalNode,
    +    buildSide: BuildSide,
    +    joinType: JoinType,
    +    condition: Option[Expression]) extends BinaryLocalNode(conf) {
    +
    +  override def output: Seq[Attribute] = {
    +    joinType match {
    +      case LeftOuter =>
    +        left.output ++ right.output.map(_.withNullability(true))
    +      case RightOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output
    +      case FullOuter =>
    +        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    +      case x =>
    +        throw new IllegalArgumentException(
    +          s"NestedLoopJoin should not take $x as the JoinType")
    +    }
    +  }
    +
    +  private[this] def genResultProjection: InternalRow => InternalRow = {
    +    if (outputsUnsafeRows) {
    +      UnsafeProjection.create(schema)
    +    } else {
    +      identity[InternalRow]
    +    }
    +  }
    +
    +  private[this] var currentRow: InternalRow = _
    +
    +  private[this] var iterator: Iterator[InternalRow] = _
    +
    +  override def open(): Unit = {
    +    val (streamed, build) = buildSide match {
    +      case BuildRight => (left, right)
    +      case BuildLeft => (right, left)
    +    }
    +    build.open()
    +    val buildRelation = new CompactBuffer[InternalRow]
    +    while (build.next()) {
    +      buildRelation += build.fetch().copy()
    +    }
    +    build.close()
    +
    +    val boundCondition =
    +      newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +
    +    val leftNulls = new GenericMutableRow(left.output.size)
    +    val rightNulls = new GenericMutableRow(right.output.size)
    +    val joinedRow = new JoinedRow
    +    val includedBuildTuples = new BitSet(buildRelation.size)
    --- End diff --
    
    joinedBuildTuples


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-138296051
  
      [Test build #42095 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42095/consoleFull) for   PR 8642 at commit [`9055d8a`](https://github.com/apache/spark/commit/9055d8afcc114734781006f019603e89c65661bc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9996][SPARK-9997][SQL]Add local expand ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8642#issuecomment-139581215
  
      [Test build #42336 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42336/console) for   PR 8642 at commit [`4b18418`](https://github.com/apache/spark/commit/4b18418a06d1a34dfb1a0193545b4fd3fb3d5e8d).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ExpandNode(`
      * `case class NestedLoopJoinNode(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org