You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by joseph-torres <gi...@git.apache.org> on 2017/10/06 20:47:02 UTC

[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

GitHub user joseph-torres opened a pull request:

    https://github.com/apache/spark/pull/19452

    [SPARK-22136][SS] Evaluate one-sided conditions early in stream-stream joins.

    ## What changes were proposed in this pull request?
    
    Evaluate one-sided conditions early in stream-stream joins.
    
    This is in addition to normal filter pushdown, because integrating it with the join logic allows it to take place in outer join scenarios. This means that rows which can never satisfy the join condition won't clog up the state. 
    
    ## How was this patch tested?
    new unit tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/joseph-torres/spark SPARK-22136

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19452.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19452
    
----
commit c90fb3cb1e112edeacb4be2604a7b628f55697f4
Author: Jose Torres <jo...@databricks.com>
Date:   2017-10-06T20:44:20Z

    Evaluate one-sided conditions early in stream-stream joins.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144435689
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -221,43 +237,36 @@ case class StreamingSymmetricHashJoinExec(
         //    matching new left input with new right input, since the new left input has become stored
         //    by that point. This tiny asymmetry is necessary to avoid duplication.
         val leftOutputIter = leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner) {
    -      (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(input).withRight(matched)
    +      (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(input).withRight(matched)
         }
         val rightOutputIter = rightSideJoiner.storeAndJoinWithOtherSide(leftSideJoiner) {
    -      (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(matched).withRight(input)
    +      (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(matched).withRight(input)
         }
     
    -    // Filter the joined rows based on the given condition.
    -    val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _
    -
         // We need to save the time that the inner join output iterator completes, since outer join
         // output counts as both update and removal time.
         var innerOutputCompletionTimeNs: Long = 0
         def onInnerOutputCompletion = {
           innerOutputCompletionTimeNs = System.nanoTime
         }
    -    val filteredInnerOutputIter = CompletionIterator[InternalRow, Iterator[InternalRow]](
    -      (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction), onInnerOutputCompletion)
    +    val innerOutputIter = CompletionIterator[InternalRow, Iterator[InternalRow]](
    +      (leftOutputIter ++ rightOutputIter), onInnerOutputCompletion)
     
         def matchesWithRightSideState(leftKeyValue: UnsafeRowPair) = {
    --- End diff --
    
    these methods can be moved into the cases, where they are required. would be easier segment the code then.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82528/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144437411
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -355,6 +362,8 @@ case class StreamingSymmetricHashJoinExec(
           inputAttributes: Seq[Attribute],
           joinKeys: Seq[Expression],
           inputIter: Iterator[InternalRow],
    +      preJoinFilter: InternalRow => Boolean,
    +      postJoinFilter: InternalRow => Boolean,
    --- End diff --
    
    add docs. this is complicated enough now to justify docs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144437332
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -402,17 +411,27 @@ case class StreamingSymmetricHashJoinExec(
     
           nonLateRows.flatMap { row =>
             val thisRow = row.asInstanceOf[UnsafeRow]
    -        val key = keyGenerator(thisRow)
    -        val outputIter = otherSideJoiner.joinStateManager.get(key).map { thatRow =>
    -          generateJoinedRow(thisRow, thatRow)
    -        }
    -        val shouldAddToState = // add only if both removal predicates do not match
    -          !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow)
    -        if (shouldAddToState) {
    -          joinStateManager.append(key, thisRow)
    -          updatedStateRowsCount += 1
    +        if (preJoinFilter(thisRow)) {
    --- End diff --
    
    add docs explaning what this condition is for.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144436114
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
     
     import org.apache.spark.rdd.RDD
     import org.apache.spark.sql.catalyst.InternalRow
    -import org.apache.spark.sql.catalyst.expressions.{Attribute, BindReferences, Expression, GenericInternalRow, JoinedRow, Literal, NamedExpression, PreciseTimestampConversion, UnsafeProjection, UnsafeRow}
    +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, BindReferences, Expression, GenericInternalRow, JoinedRow, Literal, NamedExpression, PreciseTimestampConversion, PredicateHelper, UnsafeProjection, UnsafeRow}
    --- End diff --
    
    these might be unnecessary


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82817 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82817/testReport)** for PR 19452 at commit [`d4984c7`](https://github.com/apache/spark/commit/d4984c7b169a1f9c72d160646892063bb7e18c39).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144148695
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    +      // * Conjuncts that can be applied to the right before storing.
    +      // * Conjuncts that must be applied to the full row at join time.
    +      //
    +      // Note that the third category includes both conjuncts that reference both sides
    +      // and all nondeterministic conjuncts. Nondeterministic conjuncts can't be shortcutted
    +      // to preserve any stateful semantics they may have.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          val (candidates, containingNonDeterministic) =
    +            splitConjunctivePredicates(condition.get).span(_.deterministic)
    --- End diff --
    
    Nondeterministic conjuncts don't commute across && because Spark does shortcut evaluation. (That is, "udf('val) == 0 && false" will cause udf to be evaluated, while "false && udf('val) == 0" will not.) This behavior is copied from how predicate pushdown handles nondeterminism.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144620005
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming
    +
    +import org.apache.spark.sql.Column
    +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
    +import org.apache.spark.sql.catalyst.expressions.AttributeReference
    +import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, SparkPlan}
    +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
    +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates
    +import org.apache.spark.sql.types.DataTypes
    +
    +class StreamingSymmetricHashJoinHelperSuite extends StreamTest {
    +  import org.apache.spark.sql.functions._
    +
    +  val attributeA = AttributeReference("a", DataTypes.IntegerType)()
    +  val attributeB = AttributeReference("b", DataTypes.IntegerType)()
    +  val attributeC = AttributeReference("c", DataTypes.IntegerType)()
    +  val attributeD = AttributeReference("d", DataTypes.IntegerType)()
    +  val colA = new Column(attributeA)
    +  val colB = new Column(attributeB)
    +  val colC = new Column(attributeC)
    +  val colD = new Column(attributeD)
    +
    +  val left = new LocalTableScanExec(Seq(attributeA, attributeB), Seq())
    +  val right = new LocalTableScanExec(Seq(attributeC, attributeD), Seq())
    +
    +  test("empty") {
    +    val split = JoinConditionSplitPredicates(None, left, right)
    +    assert(split.leftSideOnly.isEmpty)
    +    assert(split.rightSideOnly.isEmpty)
    +    assert(split.bothSides.isEmpty)
    +    assert(split.full.isEmpty)
    +  }
    +
    +  test("only literals") {
    +    // Literal-only conjuncts end up on the left side because that's the first bucket they fit in.
    +    // There's no semantic reason they couldn't be in any bucket.
    +    val predicate = (lit(1) < lit(5) && lit(6) < lit(7) && lit(0) === lit(-1)).expr
    +    val split = JoinConditionSplitPredicates(Some(predicate), left, right)
    +
    +    assert(split.leftSideOnly.contains(predicate))
    +    assert(split.rightSideOnly.isEmpty)
    --- End diff --
    
    I don't want to get into duplicating predicates here for the sake of symmetry. I could move literals to the post-join part maybe?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82817 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82817/testReport)** for PR 19452 at commit [`d4984c7`](https://github.com/apache/spark/commit/d4984c7b169a1f9c72d160646892063bb7e18c39).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82794 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82794/testReport)** for PR 19452 at commit [`caf8638`](https://github.com/apache/spark/commit/caf8638107a297bed2312c4c6a96d7e343defb64).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r143807269
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +    JoinConditionSplitPredicates = {
    --- End diff --
    
    fix indent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144099476
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    --- End diff --
    
    conjunct which refers to only the left side
    OR
    conjuncts which require references from only the left side of the join


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144371994
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    +      // * Conjuncts that can be applied to the right before storing.
    +      // * Conjuncts that must be applied to the full row at join time.
    +      //
    +      // Note that the third category includes both conjuncts that reference both sides
    +      // and all nondeterministic conjuncts. Nondeterministic conjuncts can't be shortcutted
    +      // to preserve any stateful semantics they may have.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          val (candidates, containingNonDeterministic) =
    +            splitConjunctivePredicates(condition.get).span(_.deterministic)
    --- End diff --
    
    can you point to the code predicate pushdown code?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144128814
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    +      // * Conjuncts that can be applied to the right before storing.
    +      // * Conjuncts that must be applied to the full row at join time.
    +      //
    +      // Note that the third category includes both conjuncts that reference both sides
    +      // and all nondeterministic conjuncts. Nondeterministic conjuncts can't be shortcutted
    +      // to preserve any stateful semantics they may have.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          val (candidates, containingNonDeterministic) =
    +            splitConjunctivePredicates(condition.get).span(_.deterministic)
    +
    +          val (leftConjuncts, nonLeftConjuncts) = candidates.partition { cond =>
    +            cond.references.subsetOf(left.outputSet)
    +          }
    +
    +          val (rightConjuncts, remainingConjuncts) = candidates.partition { cond =>
    +            cond.references.subsetOf(right.outputSet)
    +          }
    +
    +          (leftConjuncts.reduceOption(And), rightConjuncts.reduceOption(And),
    --- End diff --
    
    super nit: split into one param per line, easier to read.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144437858
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,68 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param leftSideOnly Deterministic conjuncts which reference only the left side of the join.
    +   * @param rightSideOnly Deterministic conjuncts which reference only the right side of the join.
    +   * @param bothSides Conjuncts which are nondeterministic, occur after a nondeterministic conjunct,
    +   *                  or reference both left and right sides of the join.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    leftSideOnly: Option[Expression],
    +    rightSideOnly: Option[Expression],
    +    bothSides: Option[Expression],
    +    full: Option[Expression]) {}
    --- End diff --
    
    incorrect indent. 
    add pretty toString so that when we print the plan it shows up nicely as 
    `....,  condition = [ leftOnly = ..., rightOnly = ..., both = ... ], 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82526 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82526/testReport)** for PR 19452 at commit [`c90fb3c`](https://github.com/apache/spark/commit/c90fb3cb1e112edeacb4be2604a7b628f55697f4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82706/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82748 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82748/testReport)** for PR 19452 at commit [`0a753ed`](https://github.com/apache/spark/commit/0a753ed36018efe0be5533084fc7b6040586cbb5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144708446
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -349,14 +350,35 @@ case class StreamingSymmetricHashJoinExec(
       /**
        * Internal helper class to consume input rows, generate join output rows using other sides
        * buffered state rows, and finally clean up this sides buffered state rows
    +   *
    +   * @param joinSide The JoinSide - either left or right.
    +   * @param inputAttributes The input attributes for this side of the join.
    +   * @param joinKeys The join keys.
    +   * @param inputIter The iterator of input rows on this side to be joined.
    +   * @param preJoinFilterExpr A filter over rows on this side. This filter rejects rows that could
    +   *                          never pass the overall join condition no matter what other side row
    +   *                          they're joined with.
    +   * @param postJoinFilterExpr A filter over joined rows. This filter completes the application of
    +   *                           the overall join condition, assuming that preJoinFilter on both sides
    +   *                           of the join has already been passed.
    +   * @param stateWatermarkPredicate The state watermark predicate. See
    +   *                                [[StreamingSymmetricHashJoinExec]] for further description of
    +   *                                state watermarks.
        */
       private class OneSideHashJoiner(
           joinSide: JoinSide,
           inputAttributes: Seq[Attribute],
           joinKeys: Seq[Expression],
           inputIter: Iterator[InternalRow],
    +      preJoinFilterExpr: Option[Expression],
    +      postJoinFilterExpr: Option[Expression],
           stateWatermarkPredicate: Option[JoinStateWatermarkPredicate]) {
     
    +    // Filter the joined rows based on the given condition.
    +    val preJoinFilter =
    +      newPredicate(preJoinFilterExpr.getOrElse(Literal(true)), inputAttributes).eval _
    +    val postJoinFilter = newPredicate(postJoinFilterExpr.getOrElse(Literal(true)), output).eval _
    --- End diff --
    
    this is incorrect. the schema os the rows on which this filter will be applied is `left.output ++ right.output`. You need to apply another projection to put the JoinedRow in an UnsafeRow of the schema `output`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82710/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82794 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82794/testReport)** for PR 19452 at commit [`caf8638`](https://github.com/apache/spark/commit/caf8638107a297bed2312c4c6a96d7e343defb64).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82818/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144436846
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -206,10 +213,19 @@ case class StreamingSymmetricHashJoinExec(
         val updateStartTimeNs = System.nanoTime
         val joinedRow = new JoinedRow
     
    +    // Filter the joined rows based on the given condition.
    +    val leftPreJoinFilter =
    +      newPredicate(condition.leftSideOnly.getOrElse(Literal(true)), output).eval _
    --- End diff --
    
    Also, you can dedup the code a little bit, but pushing the code generation into the Joiner. This would also be consistent with other inputs to the joiner (as expressions, not as function).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144651101
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming
    +
    +import org.apache.spark.sql.Column
    +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
    +import org.apache.spark.sql.catalyst.expressions.AttributeReference
    +import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, SparkPlan}
    +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
    +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates
    +import org.apache.spark.sql.types.DataTypes
    +
    +class StreamingSymmetricHashJoinHelperSuite extends StreamTest {
    +  import org.apache.spark.sql.functions._
    +
    +  val attributeA = AttributeReference("a", DataTypes.IntegerType)()
    +  val attributeB = AttributeReference("b", DataTypes.IntegerType)()
    +  val attributeC = AttributeReference("c", DataTypes.IntegerType)()
    +  val attributeD = AttributeReference("d", DataTypes.IntegerType)()
    +  val colA = new Column(attributeA)
    +  val colB = new Column(attributeB)
    +  val colC = new Column(attributeC)
    +  val colD = new Column(attributeD)
    +
    +  val left = new LocalTableScanExec(Seq(attributeA, attributeB), Seq())
    +  val right = new LocalTableScanExec(Seq(attributeC, attributeD), Seq())
    +
    +  test("empty") {
    +    val split = JoinConditionSplitPredicates(None, left, right)
    +    assert(split.leftSideOnly.isEmpty)
    +    assert(split.rightSideOnly.isEmpty)
    +    assert(split.bothSides.isEmpty)
    +    assert(split.full.isEmpty)
    +  }
    +
    +  test("only literals") {
    +    // Literal-only conjuncts end up on the left side because that's the first bucket they fit in.
    +    // There's no semantic reason they couldn't be in any bucket.
    +    val predicate = (lit(1) < lit(5) && lit(6) < lit(7) && lit(0) === lit(-1)).expr
    +    val split = JoinConditionSplitPredicates(Some(predicate), left, right)
    +
    +    assert(split.leftSideOnly.contains(predicate))
    +    assert(split.rightSideOnly.isEmpty)
    +    assert(split.bothSides.isEmpty)
    +    assert(split.full.contains(predicate))
    +  }
    +
    +  test("only left") {
    +    val predicate = (colA > lit(1) && colB > lit(5) && colA < colB).expr
    +    val split = JoinConditionSplitPredicates(Some(predicate), left, right)
    +
    +    assert(split.leftSideOnly.contains(predicate))
    +    assert(split.rightSideOnly.isEmpty)
    +    assert(split.bothSides.isEmpty)
    +    assert(split.full.contains(predicate))
    +  }
    +
    +  test("only right") {
    +    val predicate = (colC > lit(1) && colD > lit(5) && colD < colC).expr
    +    val split = JoinConditionSplitPredicates(Some(predicate), left, right)
    +
    +    assert(split.leftSideOnly.isEmpty)
    +    assert(split.rightSideOnly.contains(predicate))
    +    assert(split.bothSides.isEmpty)
    +    assert(split.full.contains(predicate))
    +  }
    +
    +  test("mixed conjuncts") {
    +    val predicate = (colA > colB && colC > colD && colA === colC && lit(1) === lit(1)).expr
    +    val split = JoinConditionSplitPredicates(Some(predicate), left, right)
    +
    +    assert(split.leftSideOnly.contains((colA > colB && lit(1) === lit(1)).expr))
    +    assert(split.rightSideOnly.contains((colC > colD).expr))
    +    assert(split.bothSides.contains((colA === colC).expr))
    +    assert(split.full.contains(predicate))
    +  }
    --- End diff --
    
    shouldnt we also test right only for left outer joins, and vice versa?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144099311
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    --- End diff --
    
    `joined` is not clear. does it mean has references that are neither in the left child or the right child? Or do you refer to the `left` and `right` just above? if its the latter, maybe its just say "conjuncts which require references from both sides of the join"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82706 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82706/testReport)** for PR 19452 at commit [`347b92d`](https://github.com/apache/spark/commit/347b92d2d663549eea30f2a61bd422a235ddbade).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144372504
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -387,9 +400,10 @@ case class StreamingSymmetricHashJoinExec(
          *                          input row from this side and the matched row from the other side
          */
         def storeAndJoinWithOtherSide(
    -        otherSideJoiner: OneSideHashJoiner)(
    -        generateJoinedRow: (UnsafeRow, UnsafeRow) => JoinedRow): Iterator[InternalRow] = {
    -
    +        otherSideJoiner: OneSideHashJoiner,
    +        joinedFilter: InternalRow => Boolean)(
    --- End diff --
    
    maybe add this to the constructor like the other filter. 
    you can rename it to something more consistent like "preJoinFilter" and "postJoinFilter"
    also add docs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144437488
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,68 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param leftSideOnly Deterministic conjuncts which reference only the left side of the join.
    +   * @param rightSideOnly Deterministic conjuncts which reference only the right side of the join.
    +   * @param bothSides Conjuncts which are nondeterministic, occur after a nondeterministic conjunct,
    +   *                  or reference both left and right sides of the join.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    leftSideOnly: Option[Expression],
    +    rightSideOnly: Option[Expression],
    +    bothSides: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +        JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be evaluated on only the left input.
    +      // * Conjuncts that can be evaluated on only the right input.
    +      // * Conjuncts that require both left and right input.
    +      //
    +      // Note that these splits are applied in order, so the first category will end up containing
    +      // conjuncts which depend on neither the left nor right input.
    +      //
    +      // Note also that nondeterministic conjuncts effectively require both left and right input.
    +      // To maintain their semantics, they need to be evaluated exactly once per joined row.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          // Span rather than partition, because nondeterministic expressions don't commute
    +          // across AND.
    --- End diff --
    
    good!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82526/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82526 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82526/testReport)** for PR 19452 at commit [`c90fb3c`](https://github.com/apache/spark/commit/c90fb3cb1e112edeacb4be2604a7b628f55697f4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144664962
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -349,12 +356,28 @@ case class StreamingSymmetricHashJoinExec(
       /**
        * Internal helper class to consume input rows, generate join output rows using other sides
        * buffered state rows, and finally clean up this sides buffered state rows
    +   *
    +   * @param joinSide The JoinSide - either left or right.
    +   * @param inputAttributes The input attributes for this side of the join.
    +   * @param joinKeys The join keys.
    +   * @param inputIter The iterator of input rows on this side to be joined.
    +   * @param preJoinFilter A filter over rows on this side. This filter rejects rows that could
    +   *                      never pass the overall join condition no matter what other side row
    +   *                      they're joined with.
    +   * @param postJoinFilter A filter over joined rows. This filter completes the application of the
    +   *                       overall join condition, assuming that preJoinFilter on both sides of the
    +   *                       join has already been passed.
    --- End diff --
    
    ^ awesome docs, very clear.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82743 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82743/testReport)** for PR 19452 at commit [`94dfa85`](https://github.com/apache/spark/commit/94dfa85242b350df2f630707fd27181e91fdf7ce).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144437093
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -161,6 +164,10 @@ case class StreamingSymmetricHashJoinExec(
         new SerializableConfiguration(SessionState.newHadoopConf(
           sparkContext.hadoopConfiguration, sqlContext.conf)))
     
    +
    +  val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length)
    +  val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
    --- End diff --
    
    This can also be moved into the Joiner. Use `Joiner.inputAttributes`, instead of `left.output`/`right.output`
    Basically if you have to define variables outside the Joine with the name "left", then it highly likely that you can move into the Joiner, as it is meant capture all the context needed to process one side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82706 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82706/testReport)** for PR 19452 at commit [`347b92d`](https://github.com/apache/spark/commit/347b92d2d663549eea30f2a61bd422a235ddbade).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82818 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82818/testReport)** for PR 19452 at commit [`8eb1228`](https://github.com/apache/spark/commit/8eb1228b7c6391b7eeda61b6c758b01244f7608a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144097795
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    +      // * Conjuncts that can be applied to the right before storing.
    +      // * Conjuncts that must be applied to the full row at join time.
    +      //
    +      // Note that the third category includes both conjuncts that reference both sides
    +      // and all nondeterministic conjuncts. Nondeterministic conjuncts can't be shortcutted
    +      // to preserve any stateful semantics they may have.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          val (candidates, containingNonDeterministic) =
    +            splitConjunctivePredicates(condition.get).span(_.deterministic)
    --- End diff --
    
    why is this `span` and not `partition`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82710 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82710/testReport)** for PR 19452 at commit [`6753825`](https://github.com/apache/spark/commit/67538255c00e01a8b1553c82c2c83b5ae0a7ddde).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82794/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144449963
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -161,6 +164,10 @@ case class StreamingSymmetricHashJoinExec(
         new SerializableConfiguration(SessionState.newHadoopConf(
           sparkContext.hadoopConfiguration, sqlContext.conf)))
     
    +
    +  val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length)
    +  val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
    --- End diff --
    
    right!! 
    you could pass it on as another param `otherSideNullRow` in `storeAndJoinWithOtherSide`. I am fine either way.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144533029
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming
    +
    +import org.apache.spark.sql.Column
    +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
    +import org.apache.spark.sql.catalyst.expressions.AttributeReference
    +import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, SparkPlan}
    +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
    +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates
    +import org.apache.spark.sql.types.DataTypes
    +
    +class StreamingSymmetricHashJoinHelperSuite extends StreamTest {
    +  import org.apache.spark.sql.functions._
    +
    +  val attributeA = AttributeReference("a", DataTypes.IntegerType)()
    +  val attributeB = AttributeReference("b", DataTypes.IntegerType)()
    +  val attributeC = AttributeReference("c", DataTypes.IntegerType)()
    +  val attributeD = AttributeReference("d", DataTypes.IntegerType)()
    --- End diff --
    
    the tests would be much more intuitive and therefore easier to read if these columns and variables are named such that they are obviously part of left of right. E.g. leftAttribute1 instead of attributeA, leftAttribute2 instead of attributeB, etc.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82818 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82818/testReport)** for PR 19452 at commit [`8eb1228`](https://github.com/apache/spark/commit/8eb1228b7c6391b7eeda61b6c758b01244f7608a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144532481
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming
    +
    +import org.apache.spark.sql.Column
    +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
    +import org.apache.spark.sql.catalyst.expressions.AttributeReference
    +import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, SparkPlan}
    +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
    +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates
    +import org.apache.spark.sql.types.DataTypes
    +
    +class StreamingSymmetricHashJoinHelperSuite extends StreamTest {
    +  import org.apache.spark.sql.functions._
    +
    +  val attributeA = AttributeReference("a", DataTypes.IntegerType)()
    +  val attributeB = AttributeReference("b", DataTypes.IntegerType)()
    +  val attributeC = AttributeReference("c", DataTypes.IntegerType)()
    +  val attributeD = AttributeReference("d", DataTypes.IntegerType)()
    +  val colA = new Column(attributeA)
    +  val colB = new Column(attributeB)
    +  val colC = new Column(attributeC)
    +  val colD = new Column(attributeD)
    +
    +  val left = new LocalTableScanExec(Seq(attributeA, attributeB), Seq())
    +  val right = new LocalTableScanExec(Seq(attributeC, attributeD), Seq())
    +
    +  test("empty") {
    +    val split = JoinConditionSplitPredicates(None, left, right)
    +    assert(split.leftSideOnly.isEmpty)
    +    assert(split.rightSideOnly.isEmpty)
    +    assert(split.bothSides.isEmpty)
    +    assert(split.full.isEmpty)
    +  }
    +
    +  test("only literals") {
    +    // Literal-only conjuncts end up on the left side because that's the first bucket they fit in.
    +    // There's no semantic reason they couldn't be in any bucket.
    +    val predicate = (lit(1) < lit(5) && lit(6) < lit(7) && lit(0) === lit(-1)).expr
    +    val split = JoinConditionSplitPredicates(Some(predicate), left, right)
    +
    +    assert(split.leftSideOnly.contains(predicate))
    +    assert(split.rightSideOnly.isEmpty)
    --- End diff --
    
    Nit: why should rightSideOnly be empty? The literals CAN be evaluated using right side only. This feels very asymmetrical situation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144132422
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -355,12 +364,16 @@ case class StreamingSymmetricHashJoinExec(
           inputAttributes: Seq[Attribute],
           joinKeys: Seq[Expression],
           inputIter: Iterator[InternalRow],
    +      thisSideFilterExpr: Option[Expression],
    --- End diff --
    
    joinSideFilter (since we have joinSide)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144436721
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -206,10 +213,19 @@ case class StreamingSymmetricHashJoinExec(
         val updateStartTimeNs = System.nanoTime
         val joinedRow = new JoinedRow
     
    +    // Filter the joined rows based on the given condition.
    +    val leftPreJoinFilter =
    +      newPredicate(condition.leftSideOnly.getOrElse(Literal(true)), output).eval _
    --- End diff --
    
    I am not sure this should be a predicate on the output. We are applying this on the input row, so predicate must be calculated on that rows attributes, which in this case is, left.output.
    
    I wonder why this isnt failing any test.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144435919
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -127,12 +127,13 @@ case class StreamingSymmetricHashJoinExec(
         leftKeys: Seq[Expression],
         rightKeys: Seq[Expression],
         joinType: JoinType,
    -    condition: Option[Expression],
    +    condition: JoinConditionSplitPredicates,
         stateInfo: Option[StatefulOperatorStateInfo],
         eventTimeWatermark: Option[Long],
         stateWatermarkPredicates: JoinStateWatermarkPredicates,
         left: SparkPlan,
    -    right: SparkPlan) extends SparkPlan with BinaryExecNode with StateStoreWriter {
    +    right: SparkPlan) extends SparkPlan
    +  with BinaryExecNode with StateStoreWriter {
    --- End diff --
    
    nit: unnecessary line split.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144450136
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming
    +
    +import org.apache.spark.sql.Column
    +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
    +import org.apache.spark.sql.catalyst.expressions.AttributeReference
    +import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, SparkPlan}
    +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
    +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates
    +import org.apache.spark.sql.types.DataTypes
    +
    +class StreamingSymmetricHashJoinHelperSuite extends StreamTest {
    +  import org.apache.spark.sql.functions._
    +
    +  val attributeA = AttributeReference("a", DataTypes.IntegerType)()
    --- End diff --
    
    nit: just importing `org.apache.spark.sql.types._` is sufficient to use `IntegerType`, dont need `DataTypes.IntegerType`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r143879599
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    --- End diff --
    
    left -> leftSideOnly
    right -> rightSideOnly
    joined -> bothSides
    full


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144440844
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -161,6 +164,10 @@ case class StreamingSymmetricHashJoinExec(
         new SerializableConfiguration(SessionState.newHadoopConf(
           sparkContext.hadoopConfiguration, sqlContext.conf)))
     
    +
    +  val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length)
    +  val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
    --- End diff --
    
    The problem is that the left joiner has left input attributes, but needs the right null row.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82533 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82533/testReport)** for PR 19452 at commit [`8c2a39f`](https://github.com/apache/spark/commit/8c2a39fcb3e425a91d25505ae9d29ba8ac670e0e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class JoinConditionSplitPredicates(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144130110
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -127,12 +127,13 @@ case class StreamingSymmetricHashJoinExec(
         leftKeys: Seq[Expression],
         rightKeys: Seq[Expression],
         joinType: JoinType,
    -    condition: Option[Expression],
    +    condition: JoinConditionSplitPredicates,
         stateInfo: Option[StatefulOperatorStateInfo],
         eventTimeWatermark: Option[Long],
         stateWatermarkPredicates: JoinStateWatermarkPredicates,
         left: SparkPlan,
    -    right: SparkPlan) extends SparkPlan with BinaryExecNode with StateStoreWriter {
    +    right: SparkPlan) extends SparkPlan
    +  with BinaryExecNode with StateStoreWriter with PredicateHelper {
    --- End diff --
    
    PredicateHelper not needed any more


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82819 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82819/testReport)** for PR 19452 at commit [`23db42c`](https://github.com/apache/spark/commit/23db42c5ae2011d57ba774435306487bac76f01b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82533 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82533/testReport)** for PR 19452 at commit [`8c2a39f`](https://github.com/apache/spark/commit/8c2a39fcb3e425a91d25505ae9d29ba8ac670e0e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144440608
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -206,10 +213,19 @@ case class StreamingSymmetricHashJoinExec(
         val updateStartTimeNs = System.nanoTime
         val joinedRow = new JoinedRow
     
    +    // Filter the joined rows based on the given condition.
    +    val leftPreJoinFilter =
    +      newPredicate(condition.leftSideOnly.getOrElse(Literal(true)), output).eval _
    --- End diff --
    
    Moving it in would require either also passing in the left ++ right input attributes, or passing preJoin and postJoin filters differently. I'm not sure which option is cleaner, so I can make the change you think is best.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Waiting for build 82819 to pass.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82533/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144128285
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    +      // * Conjuncts that can be applied to the right before storing.
    +      // * Conjuncts that must be applied to the full row at join time.
    +      //
    +      // Note that the third category includes both conjuncts that reference both sides
    +      // and all nondeterministic conjuncts. Nondeterministic conjuncts can't be shortcutted
    +      // to preserve any stateful semantics they may have.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          val (candidates, containingNonDeterministic) =
    +            splitConjunctivePredicates(condition.get).span(_.deterministic)
    +
    +          val (leftConjuncts, nonLeftConjuncts) = candidates.partition { cond =>
    +            cond.references.subsetOf(left.outputSet)
    +          }
    +
    +          val (rightConjuncts, remainingConjuncts) = candidates.partition { cond =>
    --- End diff --
    
    this isnt remainingConjuncts, you are partitioning `candidates`, not `nonLeftConjuncts`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144436075
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -127,12 +127,13 @@ case class StreamingSymmetricHashJoinExec(
         leftKeys: Seq[Expression],
         rightKeys: Seq[Expression],
         joinType: JoinType,
    -    condition: Option[Expression],
    +    condition: JoinConditionSplitPredicates,
    --- End diff --
    
    update docs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144131323
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -220,37 +232,36 @@ case class StreamingSymmetricHashJoinExec(
         //    stored left input, and also stores all the right input. It also generates all rows from
         //    matching new left input with new right input, since the new left input has become stored
         //    by that point. This tiny asymmetry is necessary to avoid duplication.
    -    val leftOutputIter = leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner) {
    -      (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(input).withRight(matched)
    +    val leftOutputIter = leftSideJoiner.storeAndJoinWithOtherSide(
    +      rightSideJoiner, joinedFilter) {
    +      (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(input).withRight(matched)
         }
    -    val rightOutputIter = rightSideJoiner.storeAndJoinWithOtherSide(leftSideJoiner) {
    -      (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(matched).withRight(input)
    +    val rightOutputIter = rightSideJoiner.storeAndJoinWithOtherSide(
    +      leftSideJoiner, joinedFilter) {
    +      (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(matched).withRight(input)
         }
     
    -    // Filter the joined rows based on the given condition.
    -    val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _
    -
         // We need to save the time that the inner join output iterator completes, since outer join
         // output counts as both update and removal time.
         var innerOutputCompletionTimeNs: Long = 0
         def onInnerOutputCompletion = {
           innerOutputCompletionTimeNs = System.nanoTime
         }
         val filteredInnerOutputIter = CompletionIterator[InternalRow, Iterator[InternalRow]](
    --- End diff --
    
    this is not a filtered iterator any more, please rename


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r143879728
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    --- End diff --
    
    conjuncts that can be evaluated by only the left input 
    (no need to refer to storing as that's internal details)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144128720
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    +      // * Conjuncts that can be applied to the right before storing.
    +      // * Conjuncts that must be applied to the full row at join time.
    +      //
    +      // Note that the third category includes both conjuncts that reference both sides
    +      // and all nondeterministic conjuncts. Nondeterministic conjuncts can't be shortcutted
    +      // to preserve any stateful semantics they may have.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          val (candidates, containingNonDeterministic) =
    +            splitConjunctivePredicates(condition.get).span(_.deterministic)
    --- End diff --
    
    nit: rename candidate to deterministicConjuncts


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82748/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82819/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144937226
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -221,43 +228,29 @@ case class StreamingSymmetricHashJoinExec(
         //    matching new left input with new right input, since the new left input has become stored
         //    by that point. This tiny asymmetry is necessary to avoid duplication.
         val leftOutputIter = leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner) {
    -      (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(input).withRight(matched)
    +      (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(input).withRight(matched)
         }
         val rightOutputIter = rightSideJoiner.storeAndJoinWithOtherSide(leftSideJoiner) {
    -      (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(matched).withRight(input)
    +      (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(matched).withRight(input)
         }
     
    -    // Filter the joined rows based on the given condition.
    -    val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _
    -
         // We need to save the time that the inner join output iterator completes, since outer join
         // output counts as both update and removal time.
         var innerOutputCompletionTimeNs: Long = 0
         def onInnerOutputCompletion = {
           innerOutputCompletionTimeNs = System.nanoTime
         }
    -    val filteredInnerOutputIter = CompletionIterator[InternalRow, Iterator[InternalRow]](
    -      (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction), onInnerOutputCompletion)
    -
    -    def matchesWithRightSideState(leftKeyValue: UnsafeRowPair) = {
    -      rightSideJoiner.get(leftKeyValue.key).exists(
    -        rightValue => {
    -          outputFilterFunction(
    -            joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
    -        })
    -    }
    +    // This is the iterator which produces the inner join rows. For outer joins, this will be
    +    // prepended to a second iterator producing outer join rows; for inner joins, this is the full
    +    // output.
    +    val innerOutputIter = CompletionIterator[InternalRow, Iterator[InternalRow]](
    +      (leftOutputIter ++ rightOutputIter), onInnerOutputCompletion)
     
    -    def matchesWithLeftSideState(rightKeyValue: UnsafeRowPair) = {
    -      leftSideJoiner.get(rightKeyValue.key).exists(
    -        leftValue => {
    -          outputFilterFunction(
    -            joinedRow.withLeft(leftValue).withRight(rightKeyValue.value))
    -        })
    -    }
     
    +    val postJoinFilter = newPredicate(condition.bothSides.getOrElse(Literal(true)), output).eval _
    --- End diff --
    
    This is also incorrect.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144435539
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -221,43 +237,36 @@ case class StreamingSymmetricHashJoinExec(
         //    matching new left input with new right input, since the new left input has become stored
         //    by that point. This tiny asymmetry is necessary to avoid duplication.
         val leftOutputIter = leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner) {
    -      (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(input).withRight(matched)
    +      (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(input).withRight(matched)
         }
         val rightOutputIter = rightSideJoiner.storeAndJoinWithOtherSide(leftSideJoiner) {
    -      (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(matched).withRight(input)
    +      (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(matched).withRight(input)
         }
     
    -    // Filter the joined rows based on the given condition.
    -    val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _
    -
         // We need to save the time that the inner join output iterator completes, since outer join
         // output counts as both update and removal time.
         var innerOutputCompletionTimeNs: Long = 0
         def onInnerOutputCompletion = {
           innerOutputCompletionTimeNs = System.nanoTime
         }
    -    val filteredInnerOutputIter = CompletionIterator[InternalRow, Iterator[InternalRow]](
    -      (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction), onInnerOutputCompletion)
    +    val innerOutputIter = CompletionIterator[InternalRow, Iterator[InternalRow]](
    --- End diff --
    
    nit: can you add a description of what is "inner output". the code is getting more complex, so I think better to add more docs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144449650
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ---
    @@ -206,10 +213,19 @@ case class StreamingSymmetricHashJoinExec(
         val updateStartTimeNs = System.nanoTime
         val joinedRow = new JoinedRow
     
    +    // Filter the joined rows based on the given condition.
    +    val leftPreJoinFilter =
    +      newPredicate(condition.leftSideOnly.getOrElse(Literal(true)), output).eval _
    --- End diff --
    
    preJoin: Joiner.inputAttributes should be sufficient.
    postJoin: Just use the overall output.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82743/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144396781
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left side of the join.
    +   * @param right Deterministic conjuncts which reference only the right side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    +      // * Conjuncts that can be applied to the right before storing.
    +      // * Conjuncts that must be applied to the full row at join time.
    +      //
    +      // Note that the third category includes both conjuncts that reference both sides
    +      // and all nondeterministic conjuncts. Nondeterministic conjuncts can't be shortcutted
    +      // to preserve any stateful semantics they may have.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          val (candidates, containingNonDeterministic) =
    +            splitConjunctivePredicates(condition.get).span(_.deterministic)
    --- End diff --
    
    It's in a bunch of places in PushDownPredicate, but the reason for it isn't documented in any of those places, so I'm not sure where the right place to point is. I'm adding some documentation here describing why.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82528 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82528/testReport)** for PR 19452 at commit [`026a33b`](https://github.com/apache/spark/commit/026a33bdd01c28327543b416d4716e53fb259181).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82817/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82748 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82748/testReport)** for PR 19452 at commit [`0a753ed`](https://github.com/apache/spark/commit/0a753ed36018efe0be5533084fc7b6040586cbb5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82819 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82819/testReport)** for PR 19452 at commit [`23db42c`](https://github.com/apache/spark/commit/23db42c5ae2011d57ba774435306487bac76f01b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82528 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82528/testReport)** for PR 19452 at commit [`026a33b`](https://github.com/apache/spark/commit/026a33bdd01c28327543b416d4716e53fb259181).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82743 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82743/testReport)** for PR 19452 at commit [`94dfa85`](https://github.com/apache/spark/commit/94dfa85242b350df2f630707fd27181e91fdf7ce).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    **[Test build #82710 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82710/testReport)** for PR 19452 at commit [`6753825`](https://github.com/apache/spark/commit/67538255c00e01a8b1553c82c2c83b5ae0a7ddde).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19452


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/19452
  
    LGTM. Will merge after tests pass.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org