You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dawidwys <gi...@git.apache.org> on 2016/05/11 06:23:04 UTC

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

GitHub user dawidwys opened a pull request:

    https://github.com/apache/flink/pull/1981

    [Flink-2971][table] Add outer joins to the Table API

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dawidwys/flink outerJoin

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1981.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1981
    
----
commit 4eee6ea491ec81f2ed78294a3dbdf068ae75b6e3
Author: dawidwys <wy...@gmail.com>
Date:   2016-05-11T06:20:42Z

    [Flink-2971][table] Add outer joins to the Table API

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64709373
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -269,22 +269,57 @@ case class Join(
         condition: Option[Expression]) extends BinaryNode {
     
       override def output: Seq[Attribute] = {
    -    joinType match {
    -      case JoinType.INNER => left.output ++ right.output
    -      case j => throw new ValidationException(s"Unsupported JoinType: $j")
    +    left.output ++ right.output
    +  }
    +
    +  private case class JoinFieldReference(
    +    name: String,
    +    resultType: TypeInformation[_],
    +    left: LogicalNode,
    +    right: LogicalNode) extends Attribute {
    +
    +    override def toString = s"'$name"
    +
    +    override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +      val joinInputField = left.output.zipWithIndex.find(_._1.name == name)
    +                           .orElse(
    +                             right.output.zipWithIndex.find(_._1.name == name)
    +                             .map(x => (x._1, left.output.length + x._2))).get
    +
    +      new RexInputRef(joinInputField._2,
    --- End diff --
    
    `Expression.toRexNode()` is called in `LogicalNode.construct()`. That means, that the input operators are on the top of the `RelBuilder` operations stack. So we can use, `RelBuilder.field(int inputCount, int inputOrdinal, String fieldName)` and do not have to deal with the offset computation ourselves. 
    
    I would also do the look up of the input-local index in the constructor of `JoinFieldReference` and add a flag that indicates whether the field is from the left or right input. That will be helpful when validating for join conditions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64763844
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -342,21 +368,29 @@ case class Join(
       }
     
       private def testJoinCondition(expression: Expression): Unit = {
    -    def checkIfJoinCondition(exp : Expression) = if (exp.children.exists(!_.isInstanceOf[JoinFieldReference])) {
    -      failValidation(s"Only join predicates supported. For non-join predicates use Table#where.")
    -    }
    +    def checkIfJoinCondition(exp : BinaryComparison) =
    +      if (exp.children match {
    +        case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil  =>
    +          x.belongsToLeft == y.belongsToLeft
    +        case _ => true
    +      }) {
    +        failValidation(s"Only join predicates supported. For non-join predicates use Table#where.")
    +      }
     
         var equiJoinFound = false
    -    def validateConditions(exp: Expression) : Unit = exp match {
    -      case x: And => x.children.foreach(validateConditions(_))
    +    def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
    --- End diff --
    
    You're absolutely right, my mistake, sorry! I fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221903295
  
    Ah, sorry. No, I didn't see your comment. Will think about that and reply later. Thanks! :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64567865
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---
    @@ -77,7 +76,7 @@ class DataSetJoin(
       }
     
       override def toString: String = {
    -    s"Join(where: ($joinConditionToString), join: ($joinSelectionToString))"
    +    s"$joinName(where: ($joinConditionToString), join: ($joinSelectionToString))"
       }
     
       override def explainTerms(pw: RelWriter): RelWriter = {
    --- End diff --
    
    can you add the `joinType` to `explainTerms()` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64568195
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala ---
    @@ -271,4 +270,89 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
         ds1.join(ds2).where('b === 'e).select('c, 'g)
       }
     
    +  @Test
    +  def testLeftJoinWithMultipleKeys(): Unit = {
    +    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    tEnv.getConfig.setNullCheck(true)
    +
    +    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
    +
    +    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
    +
    +    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
    +      "Hello world,ABC\n" + "Hello world, how are you?,null\n" + "I am fine.,HIJ\n" +
    +      "I am fine.,IJK\n" + "Luke Skywalker,null\n" + "Comment#1,null\n" + "Comment#2,null\n" +
    +      "Comment#3,null\n" + "Comment#4,null\n" + "Comment#5,null\n" + "Comment#6,null\n" +
    +      "Comment#7,null\n" + "Comment#8,null\n" + "Comment#9,null\n" + "Comment#10,null\n" +
    +      "Comment#11,null\n" + "Comment#12,null\n" + "Comment#13,null\n" + "Comment#14,null\n" +
    +      "Comment#15,null\n"
    +    val results = joinT.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testLeftJoinWithFilterInJoinCondition(): Unit = {
    --- End diff --
    
    This test can be removed if we do not allow non-join predicates in join conditions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64741369
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -272,23 +272,46 @@ case class Join(
         left.output ++ right.output
       }
     
    +  private object JoinFieldReference {
    +
    +    def apply(
    +      name: String,
    +      resultType: TypeInformation[_],
    +      left: LogicalNode,
    +      right: LogicalNode): JoinFieldReference = {
    +
    +      val joinInputField = left.output.zipWithIndex.find(_._1.name == name).map(_._2)
    +                           .orElse(right.output.zipWithIndex.find(_._1.name == name).map(x => left.output.length + x._2))
    +                           .getOrElse(
    +                             throw new NoSuchElementException(s"""Could not find field: $name"""))
    +                           .asInstanceOf[Int]
    +
    +      new JoinFieldReference(name, resultType, left, right, joinInputField)
    +    }
    +
    +  }
    +
       private case class JoinFieldReference(
    --- End diff --
    
    I am sorry, I think my last comment was confusing. Actually, I misinterpreted the semantics of the `RelBuilder.field(int, int, String)` method. I thought it would automatically add the offset of the left input. But apparently it doesn't...
    
    How about we change the `JoinFieldReference` to this:
    
    ```
    private case class JoinFieldReference(
          name: String,
          resultType: TypeInformation[_],
          left: LogicalNode,
          right: LogicalNode) extends Attribute {
    
        val isFromLeftInput = left.output.map(_.name).contains(name)
    
        val (indexInInput, indexInJoin) = if (isFromLeftInput) {
            val indexInLeft = left.output.map(_.name).indexOf(name)
            (indexInLeft, indexInLeft)
          } else {
            val indexInRight = right.output.map(_.name).indexOf(name)
            (indexInRight, indexInRight + left.output.length)
          }
    
        override def toString = s"'$name"
    
        override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    
          // look up type of field
          val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, name).getType()
          // create a new RexInputRef with index offset
          new RexInputRef(indexInJoin, fieldType)
        }
    
        override def withName(newName: String): Attribute = {
          if (newName == name) {
            this
          } else {
            JoinFieldReference(newName, resultType, left, right)
          }
        }
      }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64895590
  
    --- Diff: docs/apis/table.md ---
    @@ -542,6 +578,42 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
     {% endhighlight %}
           </td>
         </tr>
    +    
    +    <tr>
    +      <td><strong>RightOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    +{% highlight scala %}
    +val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
    +val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
    +val result = left.rightOuterJoin(right, 'a = 'd).select('a, 'b, 'e)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    
    +    <tr>
    +      <td><strong>LeftOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    +{% highlight scala %}
    +val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
    +val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
    +val result = left.leftOuterJoin(right, 'a = 'd).select('a, 'b, 'e)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>FullOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    +{% highlight scala %}
    +val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
    +val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
    +val result = left.fullOuterJoin(right, 'a = 'd).select('a, 'b, 'e)
    --- End diff --
    
    `=` -> `===`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64567933
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala ---
    @@ -61,21 +51,22 @@ class DataSetJoinRule
         val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
         val joinInfo = join.analyzeCondition
     
    -    new DataSetJoin(
    -      rel.getCluster,
    -      traitSet,
    -      convLeft,
    -      convRight,
    -      rel.getRowType,
    -      join.getCondition,
    -      join.getRowType,
    -      joinInfo,
    -      joinInfo.pairs.toList,
    -      JoinType.INNER,
    -      null,
    -      description)
    +        new DataSetJoin(
    --- End diff --
    
    the indention is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r63257088
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala ---
    @@ -106,9 +107,9 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
         val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
     
         ds1.join(ds2)
    -      // must fail. Field 'foo does not exist
    --- End diff --
    
    I think this indention was intended. Can you undo the change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r63256282
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala ---
    @@ -259,14 +260,21 @@ class JoinITCase(
         tEnv.registerTable("Table3", ds1)
         tEnv.registerTable("Table5", ds2)
     
    -    tEnv.sql(sqlQuery).toDataSet[Row].collect()
    +    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
    +      "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
    +      "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
    +      "null,IJK\n" + "null,JKL\n" + "null,KLM"
    +
    +    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    -  @Test(expected = classOf[PlanGenException])
    +  @Test
       def testLeftOuterJoin(): Unit = {
     
         val env = ExecutionEnvironment.getExecutionEnvironment
         val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +    tEnv.getConfig.setNullCheck(true)
     
         val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e"
    --- End diff --
    
    Switch Table3 and Table5 to have also null values in the result


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64568112
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala ---
    @@ -191,12 +192,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
         val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
     
         val joinT = ds1.join(ds2)
    -      .where('a === 'd)
    -      .groupBy('a, 'd)
    -      .select('b.sum, 'g.count)
    +                .where('a === 'd)
    --- End diff --
    
    please revert this indention change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64895561
  
    --- Diff: docs/apis/table.md ---
    @@ -542,6 +578,42 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
     {% endhighlight %}
           </td>
         </tr>
    +    
    +    <tr>
    +      <td><strong>RightOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    +{% highlight scala %}
    +val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
    +val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
    +val result = left.rightOuterJoin(right, 'a = 'd).select('a, 'b, 'e)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    
    +    <tr>
    +      <td><strong>LeftOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    --- End diff --
    
    `FULL OUTER JOIN` should be `LEFT OUTER JOIN`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64567800
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -269,22 +269,60 @@ case class Join(
         condition: Option[Expression]) extends BinaryNode {
     
       override def output: Seq[Attribute] = {
    -    joinType match {
    -      case JoinType.INNER => left.output ++ right.output
    -      case j => throw new ValidationException(s"Unsupported JoinType: $j")
    +    left.output ++ right.output
    +  }
    +
    +  private case class JoinFieldReference(
    +    name: String,
    +    resultType: TypeInformation[_],
    +    left: RelNode,
    +    right: RelNode) extends Attribute {
    +
    +    override def toString = s"'$name"
    +
    +    override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +      val joinInputField = if (left.getRowType.getFieldNames.contains(name)) {
    +        val field = left.getRowType.getField(name, false, false)
    +        (field.getIndex, field.getType)
    +      } else {
    +        val field = right.getRowType.getField(name, false, false)
    +        (field.getIndex + left.getRowType.getFieldCount, field.getType)
    +      }
    +
    +      new RexInputRef(joinInputField._1, joinInputField._2)
    +    }
    +
    +    override def withName(newName: String): Attribute = {
    +      if (newName == name) {
    +        this
    +      } else {
    +        JoinFieldReference(newName, resultType, left, right)
    +      }
         }
       }
     
       override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    -    joinType match {
    -      case JoinType.INNER =>
    -        left.construct(relBuilder)
    -        right.construct(relBuilder)
    -        relBuilder.join(JoinRelType.INNER,
    -          condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)))
    -      case _ =>
    -        throw new ValidationException(s"Unsupported JoinType: $joinType")
    +    left.construct(relBuilder)
    +    right.construct(relBuilder)
    +    val partialFunction: PartialFunction[Expression, Expression] = {
    +      case field: ResolvedFieldReference => new JoinFieldReference(
    +        field.name,
    +        field.resultType,
    +        relBuilder.peek(2, 0),
    +        relBuilder.peek(2, 1))
         }
    +
    +    val transformedExpression = condition.map(_.postOrderTransform(partialFunction))
    +                                .map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true))
    +
    +    relBuilder.join(flinkJoinTypeToCalcite(joinType), transformedExpression)
    +  }
    +
    --- End diff --
    
    can you implement the `validate()` method and check if the join condition (if set) consists only of join predicates and at least one equality join predicate? Otherwise we would fail later during optimization.
    
    So something like: `(left.a == right.b AND left.c < right.d)` would be OK but the following would fail:
    - `(left.a < right.b)` // no equi join
    - `(left.a == right.b or left.c == right.d)` // disjunctive preds are not supported atm
    - `(left.a == right.b and left.a > 10)` // non-join preds would be OK, but I think it would be cleaner to do that in `where()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221188902
  
    No worries and thanks @dawidwys!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r63255457
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -334,6 +335,156 @@ class Table(
         * }}}
         */
       def join(right: Table): Table = {
    +    join(right, new Literal(true, TypeInformation.of(classOf[Boolean])), JoinType.INNER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]].
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.join(right, "a = b && c > 3")
    +    * }}}
    +    */
    +  def join(right: Table, joinPredicate: String): Table = {
    +    join(right, joinPredicate, JoinType.INNER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]].
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.join(right, 'a === 'b && 'c > 3).select('a, 'b, 'd)
    +    * }}}
    +    */
    +  def join(right: Table, joinPredicate: Expression): Table = {
    +    join(right, joinPredicate, JoinType.INNER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
    +    * have nullCheck enables.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.leftOuterJoin(right, "a = b && c > 3").select('a, 'b, 'd)
    +    * }}}
    +    */
    +  def leftOuterJoin(right: Table, joinPredicate: String): Table = {
    +    join(right, joinPredicate, JoinType.LEFT_OUTER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
    +    * have nullCheck enables.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.leftOuterJoin(right, 'a === 'b && 'c > 3).select('a, 'b, 'd)
    +    * }}}
    +    */
    +  def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
    +    join(right, joinPredicate, JoinType.LEFT_OUTER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
    +    * have nullCheck enables.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.rightOuterJoin(right, "a = b && c > 3").select('a, 'b, 'd)
    +    * }}}
    +    */
    +  def rightOuterJoin(right: Table, joinPredicate: String): Table = {
    +    join(right, joinPredicate, JoinType.RIGHT_OUTER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
    +    * have nullCheck enables.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.rightOuterJoin(right, 'a === 'b && 'c > 3).select('a, 'b, 'd)
    +    * }}}
    +    */
    +  def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
    +    join(right, joinPredicate, JoinType.RIGHT_OUTER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
    +    * have nullCheck enables.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.fullOuterJoin(right, "a = b && c > 3").select('a, 'b, 'd)
    +    * }}}
    +    */
    +  def fullOuterJoin(right: Table, joinPredicate: String): Table = {
    +    join(right, joinPredicate, JoinType.FULL_OUTER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
    +    * have nullCheck enables.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.fullOuterJoin(right, 'a === 'b && 'c > 3).select('a, 'b, 'd)
    +    * }}}
    +    */
    +  def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
    +    join(right, joinPredicate, JoinType.FULL_OUTER)
    +  }
    +
    +  private def flinkJoinTypeToCalcite(joinType: JoinType) = joinType match {
    --- End diff --
    
    I think you can also directly use Calcite's `JoinRelType` instead of Flink's `JoinType`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221926857
  
    Thanks for looking into how Calcite splits the join conditions. You are right, we do not need to be "smarter" than Calcite and approve predicates that will be later rejected. So +1 for your suggestion.
    
    Apart from the documentation update I would only extend the error messages with the conditions that cause the failure. After that the PR should be good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64568059
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -251,12 +249,157 @@ class Table(
         * }}}
         */
       def join(right: Table): Table = {
    +    join(right, None, JoinType.INNER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]].
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.join(right, "a = b && c > 3")
    --- End diff --
    
    If we decide to only allow join predicates as join condition, we should remove the `c > 3` predicate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-222138623
  
    Really sorry for those mistakes :( Thanks for fixing them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r63255146
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -334,6 +335,156 @@ class Table(
         * }}}
         */
       def join(right: Table): Table = {
    +    join(right, new Literal(true, TypeInformation.of(classOf[Boolean])), JoinType.INNER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]].
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.join(right, "a = b && c > 3")
    +    * }}}
    +    */
    +  def join(right: Table, joinPredicate: String): Table = {
    +    join(right, joinPredicate, JoinType.INNER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]].
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.join(right, 'a === 'b && 'c > 3).select('a, 'b, 'd)
    +    * }}}
    +    */
    +  def join(right: Table, joinPredicate: Expression): Table = {
    +    join(right, joinPredicate, JoinType.INNER)
    +  }
    +
    +  /**
    +    * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
    +    * operations must not overlap, use [[as]] to rename fields if necessary.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
    +    * have nullCheck enables.
    --- End diff --
    
    should be "... must have nullCheck enableD"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64709704
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -298,11 +333,34 @@ case class Join(
         val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
         if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
           failValidation(s"filter expression ${resolvedJoin.condition} is not a boolean")
    -    } else if (!ambiguousName.isEmpty) {
    +    } else if (ambiguousName.nonEmpty) {
           failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
         }
    +
    +    resolvedJoin.condition.foreach(testJoinCondition(_))
         resolvedJoin
       }
    +
    +  private def testJoinCondition(expression: Expression): Unit = {
    +    def checkIfJoinCondition(exp : Expression) = if (exp.children.exists(!_.isInstanceOf[JoinFieldReference])) {
    --- End diff --
    
    The condition here should be: all (of the two) children are `JoinFieldReference` and one is on left and the other on the right input (that's where the input flag in `JoinFieldReference` is helpful).
    
    I would also change the type of `exp` to `BinaryComparison`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64895489
  
    --- Diff: docs/apis/table.md ---
    @@ -423,14 +423,50 @@ Table result = in.groupBy("a").select("a, b.sum as d");
         <tr>
           <td><strong>Join</strong></td>
           <td>
    -        <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and an equality join predicate must be defined using a where or filter operator.</p>
    +        <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.</p>
     {% highlight java %}
     Table left = tableEnv.fromDataSet(ds1, "a, b, c");
     Table right = tableEnv.fromDataSet(ds2, "d, e, f");
     Table result = left.join(right).where("a = d").select("a, b, e");
     {% endhighlight %}
           </td>
         </tr>
    +    
    +    <tr>
    +      <td><strong>RightOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    +{% highlight java %}
    +Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    +Table right = tableEnv.fromDataSet(ds2, "d, e, f");
    +Table result = left.rightOuterJoin(right, "a = d").select("a, b, e");
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    
    +    <tr>
    +      <td><strong>LeftOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    --- End diff --
    
    `FULL OUTER JOIN` should be `LEFT OUTER JOIN`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64742780
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -342,21 +368,29 @@ case class Join(
       }
     
       private def testJoinCondition(expression: Expression): Unit = {
    -    def checkIfJoinCondition(exp : Expression) = if (exp.children.exists(!_.isInstanceOf[JoinFieldReference])) {
    -      failValidation(s"Only join predicates supported. For non-join predicates use Table#where.")
    -    }
    +    def checkIfJoinCondition(exp : BinaryComparison) =
    +      if (exp.children match {
    --- End diff --
    
    The condition looks good but can be a bit simplified like this:
    ```
    exp.children match {
      case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
        if x.isFromLeftInput != y.isFromLeftInput
          => // predicate references both inputs. All good!
      case _ => failValidation(
        s"Only join predicates supported. For non-join predicates use Table#where.")
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64762144
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -342,21 +368,29 @@ case class Join(
       }
     
       private def testJoinCondition(expression: Expression): Unit = {
    -    def checkIfJoinCondition(exp : Expression) = if (exp.children.exists(!_.isInstanceOf[JoinFieldReference])) {
    -      failValidation(s"Only join predicates supported. For non-join predicates use Table#where.")
    -    }
    +    def checkIfJoinCondition(exp : BinaryComparison) =
    +      if (exp.children match {
    +        case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil  =>
    +          x.belongsToLeft == y.belongsToLeft
    +        case _ => true
    +      }) {
    +        failValidation(s"Only join predicates supported. For non-join predicates use Table#where.")
    +      }
     
         var equiJoinFound = false
    -    def validateConditions(exp: Expression) : Unit = exp match {
    -      case x: And => x.children.foreach(validateConditions(_))
    +    def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
    --- End diff --
    
    I'm sorry but this check does not catch all cases. For example, it would let `l.a = r.b OR (l.c = r.d && l.e = r.f)` pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1981


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221307901
  
    @fhueske I have uploaded the updated PR. Unfortunately there are some strange VM crashes, that I think are not related to the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221187388
  
    Hi @dawidwys, do you have some time to work on this PR this week?
    I will be away next week + a couple of days and would really like to merge it before.
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64710283
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -298,11 +333,34 @@ case class Join(
         val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
         if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
           failValidation(s"filter expression ${resolvedJoin.condition} is not a boolean")
    -    } else if (!ambiguousName.isEmpty) {
    +    } else if (ambiguousName.nonEmpty) {
           failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
         }
    +
    +    resolvedJoin.condition.foreach(testJoinCondition(_))
         resolvedJoin
       }
    +
    +  private def testJoinCondition(expression: Expression): Unit = {
    +    def checkIfJoinCondition(exp : Expression) = if (exp.children.exists(!_.isInstanceOf[JoinFieldReference])) {
    +      failValidation(s"Only join predicates supported. For non-join predicates use Table#where.")
    +    }
    +
    +    var equiJoinFound = false
    +    def validateConditions(exp: Expression) : Unit = exp match {
    --- End diff --
    
    Checking whether it is an equi-join is a bit more difficult. For instance `l.a = r.b AND (l.c = r.d OR l.c = r.e)` is a valid equi-join condition. The first condition `(l.a = r.b)` can be pushed in the DataSet join (`left.join(right).where(a).equalTo(b)`) and the remaining condition can be evaluated in the `JoinFunction`.
    
    To do the check, the predicate should be first normalized into "Conjunctive Normal Form" (CNF). Once that is done, the check is easy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64565786
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -269,22 +269,60 @@ case class Join(
         condition: Option[Expression]) extends BinaryNode {
     
       override def output: Seq[Attribute] = {
    -    joinType match {
    -      case JoinType.INNER => left.output ++ right.output
    -      case j => throw new ValidationException(s"Unsupported JoinType: $j")
    +    left.output ++ right.output
    +  }
    +
    +  private case class JoinFieldReference(
    +    name: String,
    +    resultType: TypeInformation[_],
    +    left: RelNode,
    --- End diff --
    
    I think JoinFieldReference should rather use `LogicalNode` instead of `RelNode` to refer to the inputs of the join.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64895548
  
    --- Diff: docs/apis/table.md ---
    @@ -542,6 +578,42 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
     {% endhighlight %}
           </td>
         </tr>
    +    
    +    <tr>
    +      <td><strong>RightOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    +{% highlight scala %}
    +val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
    +val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
    +val result = left.rightOuterJoin(right, 'a = 'd).select('a, 'b, 'e)
    --- End diff --
    
    `=` should be `===`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221846252
  
    Thanks @fhueske for your comments. I uploaded another version with my resolutions to issues you've raised. Still happy to discuss.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-222131071
  
    Thanks for the update @dawidwys. I found a few minor issues in the docs. Will fix them and merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64547118
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala ---
    @@ -17,9 +17,9 @@
      */
     package org.apache.flink.api.table.expressions
     
    -import org.apache.calcite.rex.RexNode
    +import org.apache.calcite.rel.RelNode
     import org.apache.calcite.tools.RelBuilder
    -
    +import org.apache.calcite.rex.{RexInputRef, RexNode}
    --- End diff --
    
    No need to change the imports in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-218481504
  
    Thanks for this PR, @dawidwys! I haven't had a look yet but will do soon. 
    
    Just a quick comment: this PR touches some files that PR #1958 is also modifying (mostly on the API level, not the runtime and optimization code). PR #1958 is a bigger change but in good shape and should be mergable soon. I would like to merge #1958 before this one, so you will need to rebase once this has happened. That should not be too much work, just wanted to let you know in advance.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64895504
  
    --- Diff: docs/apis/table.md ---
    @@ -542,6 +578,42 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
     {% endhighlight %}
           </td>
         </tr>
    +    
    +    <tr>
    +      <td><strong>RightOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    --- End diff --
    
    `FULL OUTER JOIN` should be `RIGHT OUTER JOIN`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64714427
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -269,22 +269,57 @@ case class Join(
         condition: Option[Expression]) extends BinaryNode {
     
       override def output: Seq[Attribute] = {
    -    joinType match {
    -      case JoinType.INNER => left.output ++ right.output
    -      case j => throw new ValidationException(s"Unsupported JoinType: $j")
    +    left.output ++ right.output
    +  }
    +
    +  private case class JoinFieldReference(
    +    name: String,
    +    resultType: TypeInformation[_],
    +    left: LogicalNode,
    +    right: LogicalNode) extends Attribute {
    +
    +    override def toString = s"'$name"
    +
    +    override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +      val joinInputField = left.output.zipWithIndex.find(_._1.name == name)
    +                           .orElse(
    +                             right.output.zipWithIndex.find(_._1.name == name)
    +                             .map(x => (x._1, left.output.length + x._2))).get
    +
    +      new RexInputRef(joinInputField._2,
    --- End diff --
    
    Regarding the `RelBuilder.field` I also thought it is like you said, but unfortunately in case of Join it is not. The `RelBuilder.field` returns index to seperate inputs, while in `Join` operation it should point to  input being concatenation of both inputs. That is why I compute the index myself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221187695
  
    Hi @fhueske. Yes I will update this PR today. Sorry I haven't done it earlier but I was away for the past week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r63257292
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala ---
    @@ -271,4 +270,86 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
         ds1.join(ds2).where('b === 'e).select('c, 'g)
       }
     
    +  @Test
    +  def testLeftJoinWithMultipleKeys(): Unit = {
    +    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    tEnv.getConfig.setNullCheck(true)
    +
    +    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
    +
    +    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
    +
    +    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
    +      "Hello world,ABC\n" + "Hello world, how are you?,null\n" + "I am fine.,HIJ\n" +
    +      "I am fine.,IJK\n" + "Luke Skywalker,null\n" + "Comment#1,null\n" + "Comment#2,null\n" +
    +      "Comment#3,null\n" + "Comment#4,null\n" + "Comment#5,null\n" + "Comment#6,null\n" +
    +      "Comment#7,null\n" + "Comment#8,null\n" + "Comment#9,null\n" + "Comment#10,null\n" +
    +      "Comment#11,null\n" + "Comment#12,null\n" + "Comment#13,null\n" + "Comment#14,null\n" +
    +      "Comment#15,null\n"
    +    val results = joinT.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testLeftJoinWithFilterInJoinCondition(): Unit = {
    +    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    tEnv.getConfig.setNullCheck(true)
    +
    +    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
    +
    +    val joinT = ds1.leftOuterJoin(ds2, 'a < 3 && 'b === 'd.cast(TypeInformation.of(classOf[Long])))
    --- End diff --
    
    Can you change the query such that the result has a null field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r63253874
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---
    @@ -159,9 +158,20 @@ class DataSetJoin(
         val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
         val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     
    +    val (joinOperator, nullCheck) = joinType match {
    +      case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
    +      case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
    +      case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
    +      case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
    +    }
    +
    +    if (nullCheck && !config.getNullCheck) {
    +      throw new TableException("Null check in TableEnvironment must be enabled for outer joins.")
    --- End diff --
    
    TableEnvironment should be TableConfig


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64895471
  
    --- Diff: docs/apis/table.md ---
    @@ -423,14 +423,50 @@ Table result = in.groupBy("a").select("a, b.sum as d");
         <tr>
           <td><strong>Join</strong></td>
           <td>
    -        <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and an equality join predicate must be defined using a where or filter operator.</p>
    +        <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.</p>
     {% highlight java %}
     Table left = tableEnv.fromDataSet(ds1, "a, b, c");
     Table right = tableEnv.fromDataSet(ds2, "d, e, f");
     Table result = left.join(right).where("a = d").select("a, b, e");
     {% endhighlight %}
           </td>
         </tr>
    +    
    +    <tr>
    +      <td><strong>RightOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    --- End diff --
    
    `FULL OUTER JOIN` should be `RIGHT OUTER JOIN`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64895580
  
    --- Diff: docs/apis/table.md ---
    @@ -542,6 +578,42 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
     {% endhighlight %}
           </td>
         </tr>
    +    
    +    <tr>
    +      <td><strong>RightOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    +{% highlight scala %}
    +val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
    +val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
    +val result = left.rightOuterJoin(right, 'a = 'd).select('a, 'b, 'e)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    
    +    <tr>
    +      <td><strong>LeftOuterJoin</strong></td>
    +      <td>
    +        <p>Similar to a SQL FULL OUTER JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
    +{% highlight scala %}
    +val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
    +val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
    +val result = left.leftOuterJoin(right, 'a = 'd).select('a, 'b, 'e)
    --- End diff --
    
    `=` -> `===`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221570581
  
    Thanks for the update @dawidwys!
    The resolving / validation phase needs a bit of rewrite / extension. Other than that it looks good. The other comments are just minor issues.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221528310
  
    Yes, the failures are a known issue at the moment :-/ Hope this will be fixed soon.
    I'll run the tests later myself to check that the build passes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64778778
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -298,11 +338,40 @@ case class Join(
         val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
         if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
           failValidation(s"filter expression ${resolvedJoin.condition} is not a boolean")
    -    } else if (!ambiguousName.isEmpty) {
    +    } else if (ambiguousName.nonEmpty) {
           failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
         }
    +
    +    resolvedJoin.condition.foreach(testJoinCondition(_))
         resolvedJoin
       }
    +
    +  private def testJoinCondition(expression: Expression): Unit = {
    +    def checkIfJoinCondition(exp : BinaryComparison) = exp.children match {
    +        case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil
    +          if x.isFromLeftInput != y.isFromLeftInput => Unit
    +        case _ => failValidation(
    +          s"Only join predicates supported. For non-join predicates use Table#where.")
    --- End diff --
    
    Add the condition that causes the failure to the error message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64728794
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -298,11 +333,34 @@ case class Join(
         val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
         if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
           failValidation(s"filter expression ${resolvedJoin.condition} is not a boolean")
    -    } else if (!ambiguousName.isEmpty) {
    +    } else if (ambiguousName.nonEmpty) {
           failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
         }
    +
    +    resolvedJoin.condition.foreach(testJoinCondition(_))
         resolvedJoin
       }
    +
    +  private def testJoinCondition(expression: Expression): Unit = {
    +    def checkIfJoinCondition(exp : Expression) = if (exp.children.exists(!_.isInstanceOf[JoinFieldReference])) {
    +      failValidation(s"Only join predicates supported. For non-join predicates use Table#where.")
    +    }
    +
    +    var equiJoinFound = false
    +    def validateConditions(exp: Expression) : Unit = exp match {
    --- End diff --
    
    I am not sure if the normalization to CNF is necessary. Calcite does not do it, but searches just the `AND` operators subtrees for equi conditions. E.g. For condition (l.a = r.b AND l.c =r.d) OR (l.a = r.b AND l.e = r.f) after transformation to CNF it will be possible to find equi condition, but Calcite is not finding it (see `RelOptUtil#splitJoinCondition`) and results in failing DataSet join.
    
    My proposition would be:
    
    1. traverse whole tree(both `OR` and `AND` branches) checking if only join-conditions exists
    2. check if equi-condition exists in `AND` branch
    
    What do you think @fhueske ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64778861
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -298,11 +338,40 @@ case class Join(
         val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
         if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
           failValidation(s"filter expression ${resolvedJoin.condition} is not a boolean")
    -    } else if (!ambiguousName.isEmpty) {
    +    } else if (ambiguousName.nonEmpty) {
           failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
         }
    +
    +    resolvedJoin.condition.foreach(testJoinCondition(_))
         resolvedJoin
       }
    +
    +  private def testJoinCondition(expression: Expression): Unit = {
    +    def checkIfJoinCondition(exp : BinaryComparison) = exp.children match {
    +        case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil
    +          if x.isFromLeftInput != y.isFromLeftInput => Unit
    +        case _ => failValidation(
    +          s"Only join predicates supported. For non-join predicates use Table#where.")
    +      }
    +
    +    var equiJoinFound = false
    +    def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
    +      case x: And => x.children.foreach(validateConditions(_, isAndBranch))
    +      case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false))
    +      case x: EqualTo =>
    +        if (isAndBranch) {
    +          equiJoinFound = true
    +        }
    +        checkIfJoinCondition(x)
    +      case x: BinaryComparison => checkIfJoinCondition(x)
    +      case x => failValidation(s"Unsupported condition type: ${x.getClass.getSimpleName}.")
    +    }
    +
    +    validateConditions(expression, isAndBranch = true)
    +    if (!equiJoinFound) {
    +      failValidation(s"At least one equi-join required.")
    --- End diff --
    
    Add the whole join predicate to the error message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221812986
  
    Hi @dawidwys! Thanks for the fast update! Looks mostly good. I suggested one simplification with the `JoinFieldReference` and the validation needs to be improved a bit. Let me know if you have questions.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64566174
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -269,22 +269,60 @@ case class Join(
         condition: Option[Expression]) extends BinaryNode {
     
       override def output: Seq[Attribute] = {
    -    joinType match {
    -      case JoinType.INNER => left.output ++ right.output
    -      case j => throw new ValidationException(s"Unsupported JoinType: $j")
    +    left.output ++ right.output
    +  }
    +
    +  private case class JoinFieldReference(
    +    name: String,
    +    resultType: TypeInformation[_],
    +    left: RelNode,
    +    right: RelNode) extends Attribute {
    +
    +    override def toString = s"'$name"
    +
    +    override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +      val joinInputField = if (left.getRowType.getFieldNames.contains(name)) {
    +        val field = left.getRowType.getField(name, false, false)
    +        (field.getIndex, field.getType)
    +      } else {
    +        val field = right.getRowType.getField(name, false, false)
    +        (field.getIndex + left.getRowType.getFieldCount, field.getType)
    +      }
    +
    +      new RexInputRef(joinInputField._1, joinInputField._2)
    +    }
    +
    +    override def withName(newName: String): Attribute = {
    +      if (newName == name) {
    +        this
    +      } else {
    +        JoinFieldReference(newName, resultType, left, right)
    +      }
         }
       }
     
       override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    -    joinType match {
    -      case JoinType.INNER =>
    -        left.construct(relBuilder)
    -        right.construct(relBuilder)
    -        relBuilder.join(JoinRelType.INNER,
    -          condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)))
    -      case _ =>
    -        throw new ValidationException(s"Unsupported JoinType: $joinType")
    +    left.construct(relBuilder)
    +    right.construct(relBuilder)
    +    val partialFunction: PartialFunction[Expression, Expression] = {
    --- End diff --
    
    I think the resolution into `JoinFieldReference` should happen in the `resolveExpression()` method / phase and not in `construct()`. See the docs in `LogicalNode`. What do you think @yjshen?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221875998
  
    I applied your changes @fhueske . If you have any comments regarding the equi-join validation I will address/apply them in the evening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221901395
  
    I am not sure if you've seen my comment(due it was to some previous commit version). Let me paste my doubts about CNF one more time:
    
    > I am not sure if the normalization to CNF is necessary. Calcite does not do it, but searches just the AND operators subtrees for equi conditions. E.g. For condition (l.a = r.b AND l.c =r.d) OR (l.a = r.b AND l.e = r.f) after transformation to CNF it will be possible to find equi condition, but Calcite is not finding it (see RelOptUtil#splitJoinCondition) and results in failing DataSet join.
    > 
    > My proposition would be:
    > 
    > traverse whole tree(both OR and AND branches) checking if only join-conditions exists
    > check if equi-condition exists in AND branch



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-219172614
  
    Hi @dawidwys, 
    the runtime code, rule, API methods, and the tests look very good. :-)
    
    PR #1958 should be ready to be merged (waiting for @twalthr to give his OK). I think you can rebase your code on top of #1958. 
    
    One last comment with respect to style fixes. Unfortunately, we do not have a strict code style in place that is automatically enforced and contributors follow sometimes different styles. We try to keep style changes in PRs to a minimum. Some changes make absolutely sense, but other changes might be reverted by the next person going over the file. Also style changes can distract from important changes. There is no hard rule for what to change, but a good rule of thumb is to leave code as it is if in doubt. 
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1981#issuecomment-221899756
  
    Hi @dawidwys, the equi join check is a bit more involved. I implemented a CNF converter based on the algorithm described here: http://cs.jhu.edu/~jason/tutorials/convert-to-CNF (see my [branch](https://github.com/fhueske/flink/tree/tableOuter) ).
    You can try to do implement the algorithm as well. Alternatively, we can also use my version.
    
    Another thing we need to fix is the documentation, i.e., extending the Table API description and removing the outer join restriction in the SQL section in `docs/apis/table.md`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---