You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by xccui <gi...@git.apache.org> on 2017/10/17 04:22:23 UTC

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

GitHub user xccui opened a pull request:

    https://github.com/apache/flink/pull/4842

    [FLINK-7853] [table] Reject table function outer joins with predicates in table API

    # What is the purpose of the change
    
    Due to CALCITE-2004, this PR aims to temporarily restrict the predicates for table function outer join to be empty or literal true.
    
    ## Brief change log
    
      - Allow boolean literals in join conditions.
      - Restrict the table function left outer join predicates to be empty or literal true in `operators.scala`.
      - Add related tests.
      - Refine the documents.
    
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `CorrelateTest` and `CorrelateITCase`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xccui/flink FLINK-7853

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4842.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4842
    
----
commit 8fa458ffa94062f3e724908b67d75469cebf74b0
Author: Xingcan Cui <xi...@gmail.com>
Date:   2017-10-16T08:49:14Z

    [FLINK-7853] [table] Reject table function outer joins with predicates in Table API

----


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145257154
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -476,20 +481,30 @@ case class Join(
             } else {
               nonEquiJoinPredicateFound = true
             }
    +      // The boolean literal should be valid condition type.
    +      case x: Literal if x.resultType == Types.BOOLEAN =>
           case x => failValidation(
             s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
         }
     
         validateConditions(expression, isAndBranch = true)
    -    if (!equiJoinPredicateFound) {
    -      failValidation(
    -        s"Invalid join condition: $expression. At least one equi-join predicate is " +
    -          s"required.")
    -    }
    -    if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) {
    -      failValidation(
    -        s"Invalid join condition: $expression. Non-equality join predicates or local" +
    -          s" predicates are not supported in outer joins.")
    +
    +    // Due to CALCITE-2004, we cannot accept join predicates except literal true for TableFunction
    +    // left outer join.
    +    if (correlated && right.isInstanceOf[LogicalTableFunctionCall] && joinType != JoinType.INNER ) {
    +      if (!alwaysTrue) failValidation("TableFunction left outer join predicates can only be " +
    --- End diff --
    
    `predicates` -> `predicate`


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145253567
  
    --- Diff: docs/dev/table/tableApi.md ---
    @@ -687,6 +689,7 @@ val result: Table = table
             <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
         	<td>
             <p>Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.
    +        <p><b>Note:</b> Currently the predicates for table function left outer join can only be empty or literal <code>true</code>.</p>
    --- End diff --
    
    -> `Currently, the predicate of a table function left outer join can only be empty or literal <code>true</code>.`


---

[GitHub] flink issue #4842: [FLINK-7853] [table] Reject table function outer joins wi...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/4842
  
    Thanks for the review, @fhueske. I've updated the code and create a JIRA FLINK-7865 to track the resolving process of CALCITE-2004.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145345140
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ---
    @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  /**
    +    * Due to CALCITE-2004, common join predicates are temporarily forbidden.
    +    */
    +  @Test (expected = classOf[ValidationException])
    +  def testLeftOuterJoinWithPredicates(): Unit = {
    --- End diff --
    
    I see, but let's rather add a JIRA issue for FLINK and link to that one (which links to the Calcite JIRA and the discussion on the Calcite dev list).


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145260893
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala ---
    @@ -82,6 +82,42 @@ class CorrelateITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * Due to CALCITE-2004, common join predicates are temporarily forbidden.
    +    */
    +  @Test (expected = classOf[ValidationException])
    +  def testLeftOuterJoinWithPredicates(): Unit = {
    --- End diff --
    
    The validation has been checked before. We don't need an ITCase for this.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145254351
  
    --- Diff: docs/dev/table/tableApi.md ---
    @@ -695,6 +698,7 @@ val split: TableFunction[_] = new MySplitUDTF()
     // join
     val result: Table = table
         .leftOuterJoin(split('c) as ('s, 't, 'v))
    +    .where('a > 5)
    --- End diff --
    
    I would not add this. It's a local predicate on the outer table and not related to the table function join.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145260098
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala ---
    @@ -73,13 +74,17 @@ class CorrelateTest extends TableTestBase {
         util.verifyTable(result2, expected2)
       }
     
    +  /**
    --- End diff --
    
    Please remove this comment. The test does not include a corresponding query.


---

[GitHub] flink issue #4842: [FLINK-7853] [table] Reject table function outer joins wi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4842
  
    Thanks for the update @xccui. The PR looks good!
    I will merge it


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145298482
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala ---
    @@ -82,6 +82,42 @@ class CorrelateITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * Due to CALCITE-2004, common join predicates are temporarily forbidden.
    +    */
    +  @Test (expected = classOf[ValidationException])
    +  def testLeftOuterJoinWithPredicates(): Unit = {
    --- End diff --
    
    This test should have been used as a normal test case for this join, i.e., we don't except it to raise an exception. I'd suggest to keep it for later use, otherwise we may not be aware the test is incomplete in the future.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145260567
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala ---
    @@ -93,9 +98,25 @@ class CorrelateTest extends TableTestBase {
               "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
             term("joinType", "LEFT")
           ),
    -      term("select", "c", "s")
    +      term("select", "c", "s"),
    +      term("where", ">(s, '')")
         )
     
         util.verifyTable(result, expected)
       }
    +
    +  /**
    +    * Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the
    +    * join predicates can only be empty or literal true.
    +    */
    +  @Test (expected = classOf[ValidationException])
    --- End diff --
    
    You can add a test for `leftOuterJoin(function(...) as 'x, true)` if you want to.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145299001
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ---
    @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  /**
    +    * Due to CALCITE-2004, common join predicates are temporarily forbidden.
    +    */
    +  @Test (expected = classOf[ValidationException])
    +  def testLeftOuterJoinWithPredicates(): Unit = {
    --- End diff --
    
    This is the same with the batch fashion.
    > This test should have been used as a normal test case for this join, i.e., we don't expect it to raise an exception. I'd suggest to keep it for later use, otherwise we may not be aware the test is incomplete in the future.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4842


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145253981
  
    --- Diff: docs/dev/table/tableApi.md ---
    @@ -583,7 +584,8 @@ tEnv.registerFunction("split", split);
     // join
     Table orders = tableEnv.scan("Orders");
     Table result = orders
    -    .leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v")))
    +    .leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v"))
    +    .where("a > 5")
    --- End diff --
    
    I would not add this. It's a local predicate on the outer table and not related to the table function join.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145261361
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala ---
    @@ -82,6 +82,42 @@ class CorrelateITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * Due to CALCITE-2004, common join predicates are temporarily forbidden.
    +    */
    +  @Test (expected = classOf[ValidationException])
    +  def testLeftOuterJoinWithPredicates(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
    +    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
    +
    +    val func2 = new TableFunc2
    +    val result = in
    +      .leftOuterJoin(func2('c) as ('s, 'l), 'a === 'l)
    +      .select('c, 's, 'l)
    +      .toDataSet[Row]
    +    val results = result.collect()
    +    val expected = "John#19,19,2\n" + "nosharp,null,null"
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testLeftOuterJoinWithWhere(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
    +    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
    +
    +    val func2 = new TableFunc2
    +    val result = in
    +      .leftOuterJoin(func2('c) as ('s, 'l), true)
    +      .where('a >= 'l) // The where clause should be evaluated after the join.
    --- End diff --
    
    actually, it can be evaluated before the join because it is on the outer side and not on the attributes of the table function. The `testWithFilter()` covers the interesting case (`where` and `filter` are identical). 
    
    I think we can remove this test method.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145259863
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala ---
    @@ -93,9 +98,25 @@ class CorrelateTest extends TableTestBase {
               "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
             term("joinType", "LEFT")
           ),
    -      term("select", "c", "s")
    +      term("select", "c", "s"),
    +      term("where", ">(s, '')")
         )
     
         util.verifyTable(result, expected)
       }
    +
    +  /**
    +    * Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the
    +    * join predicates can only be empty or literal true.
    +    */
    +  @Test (expected = classOf[ValidationException])
    --- End diff --
    
    There is a sub-package `validation` that contains tests that verify correct validation. Please move this test method into a new `CorrelateValidationTest` there.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145262038
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ---
    @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  /**
    +    * Due to CALCITE-2004, common join predicates are temporarily forbidden.
    +    */
    +  @Test (expected = classOf[ValidationException])
    +  def testLeftOuterJoinWithPredicates(): Unit = {
    +    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
    +    val func0 = new TableFunc0
    +
    +    val result = t
    +      .leftOuterJoin(func0('c) as ('s, 'l), 'a === 'l)
    +      .select('c, 's, 'l)
    +      .toAppendStream[Row]
    +
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = "John#19,null,null\n" + "John#22,null,null\n" + "Anna44,null,null\n" +
    +      "nosharp,null,null"
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testLeftOuterJoinWithWhere(): Unit = {
    --- End diff --
    
    `testUserDefinedTableFunctionWithScalarFunction()` covers the same case (a predicate on a table function attribute). 
    
    I think we can remove this test.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145260250
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala ---
    @@ -99,6 +104,21 @@ class CorrelateTest extends TableTestBase {
         util.verifyTable(result, expected)
       }
     
    +  /**
    +    * Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the
    +    * join predicates can only be empty or literal true.
    +    */
    +  @Test (expected = classOf[ValidationException])
    --- End diff --
    
    move this test method to `validate.CorrelateValidationTest`.


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145253439
  
    --- Diff: docs/dev/table/tableApi.md ---
    @@ -574,6 +574,7 @@ Table result = orders
           </td>
           <td>
             <p>Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.
    +        <p><b>Note:</b> Currently the predicates for table function left outer join can only be empty or literal <code>true</code>.</p>
    --- End diff --
    
    -> `Currently, the predicate of a table function left outer join can only be empty or literal <code>true</code>.`


---

[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4842#discussion_r145260916
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ---
    @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  /**
    +    * Due to CALCITE-2004, common join predicates are temporarily forbidden.
    +    */
    +  @Test (expected = classOf[ValidationException])
    +  def testLeftOuterJoinWithPredicates(): Unit = {
    --- End diff --
    
    The validation has been checked before. We don't need an ITCase for this.


---