You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/19 14:07:03 UTC

[flink] branch release-1.9 updated: [FLINK-13321][table-planner-blink] Fix lateral join udtf with constant parameters doesn't work in blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 3566e8c  [FLINK-13321][table-planner-blink] Fix lateral join udtf with constant parameters doesn't work in blink planner
3566e8c is described below

commit 3566e8c56ddd81ed819017f35d9f3f2c5049077f
Author: beyond1920 <be...@126.com>
AuthorDate: Thu Jul 18 19:38:28 2019 +0800

    [FLINK-13321][table-planner-blink] Fix lateral join udtf with constant parameters doesn't work in blink planner
    
    This closes #9162
---
 .../logical/FlinkLogicalTableFunctionScan.scala    | 50 +------------
 .../table/plan/rules/FlinkBatchRuleSets.scala      |  1 +
 .../table/plan/rules/FlinkStreamRuleSets.scala     |  1 +
 .../BatchExecConstantTableFunctionScanRule.scala   | 85 ++++++++++++++++++++++
 .../StreamExecConstantTableFunctionScanRule.scala  | 85 ++++++++++++++++++++++
 .../runtime/batch/table/CorrelateITCase.scala      |  5 +-
 .../table/runtime/stream/sql/CorrelateITCase.scala | 24 ++++++
 .../runtime/stream/table/CorrelateITCase.scala     | 31 ++++++++
 8 files changed, 230 insertions(+), 52 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index 41074bf..b762779 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -22,16 +22,14 @@ import org.apache.flink.table.functions.TemporalTableFunction
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.FlinkConventions
 
-import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.{JoinRelType, TableFunctionScan}
+import org.apache.calcite.rel.core.TableFunctionScan
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan
 import org.apache.calcite.rel.metadata.RelColumnMapping
-import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode, RexUtil}
-import org.apache.calcite.util.ImmutableBitSet
+import org.apache.calcite.rex.{RexCall, RexNode}
 
 import java.lang.reflect.Type
 import java.util
@@ -109,17 +107,6 @@ class FlinkLogicalTableFunctionScanConverter
     val scan = rel.asInstanceOf[LogicalTableFunctionScan]
     val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
 
-    val constantTableFunction = RexUtil.isConstant(scan.getCall) && scan.getInputs.isEmpty
-    if (constantTableFunction) {
-      convertConstantFunctionTableScan(scan, traitSet)
-    } else {
-      createFlinkLogicalTableScan(scan, traitSet)
-    }
-  }
-
-  def createFlinkLogicalTableScan(
-      scan: LogicalTableFunctionScan,
-      traitSet: RelTraitSet): FlinkLogicalTableFunctionScan = {
     new FlinkLogicalTableFunctionScan(
       scan.getCluster,
       traitSet,
@@ -131,39 +118,6 @@ class FlinkLogicalTableFunctionScanConverter
     )
   }
 
-  /**
-    * Converts [[LogicalTableFunctionScan]] with constant RexCall to
-    * {{{
-    *                    [[FlinkLogicalCorrelate]]
-    *                          /       \
-    * empty [[FlinkLogicalValues]]  [[FlinkLogicalTableFunctionScan]]
-    * }}}
-    */
-  def convertConstantFunctionTableScan(
-      scan: LogicalTableFunctionScan,
-      traitSet: RelTraitSet): RelNode = {
-    val cluster = scan.getCluster
-
-    // create correlate left
-    val values = new FlinkLogicalValues(
-      cluster,
-      traitSet,
-      cluster.getTypeFactory.createStructType(ImmutableList.of(), ImmutableList.of()),
-      ImmutableList.of(ImmutableList.of[RexLiteral]())
-    )
-
-    // create correlate right
-    val newScan = createFlinkLogicalTableScan(scan, traitSet)
-
-    new FlinkLogicalCorrelate(
-      cluster,
-      traitSet,
-      values,
-      newScan,
-      cluster.createCorrel(), // a dummy CorrelationId
-      ImmutableBitSet.of(),
-      JoinRelType.INNER)
-  }
 }
 
 object FlinkLogicalTableFunctionScan {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index 88ddb3d..6538ca6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -396,6 +396,7 @@ object FlinkBatchRuleSets {
     BatchExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
     BatchExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
     // correlate
+    BatchExecConstantTableFunctionScanRule.INSTANCE,
     BatchExecCorrelateRule.INSTANCE,
     // sink
     BatchExecSinkRule.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index d095f76..31b27ef 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -378,6 +378,7 @@ object FlinkStreamRuleSets {
     // CEP
     StreamExecMatchRule.INSTANCE,
     // correlate
+    StreamExecConstantTableFunctionScanRule.INSTANCE,
     StreamExecCorrelateRule.INSTANCE,
     // sink
     StreamExecSinkRule.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala
new file mode 100644
index 0000000..808affa
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.batch
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.nodes.physical.batch.{BatchExecCorrelate, BatchExecValues}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexLiteral, RexUtil}
+
+/**
+  * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to
+  * {{{
+  *                    [[BatchExecCorrelate]]
+  *                          /       \
+  * empty [[BatchExecValues]]  [[FlinkLogicalTableFunctionScan]]
+  * }}}
+  *
+  * Add the rule to support select from a UDF directly, such as the following SQL:
+  * SELECT * FROM LATERAL TABLE(func()) as T(c)
+  *
+  * Note: [[BatchExecCorrelateRule]] is responsible for converting a reasonable physical plan for
+  * the normal correlate query, such as the following SQL:
+  * example1: SELECT * FROM T, LATERAL TABLE(func()) as T(c)
+  * example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)
+  */
+class BatchExecConstantTableFunctionScanRule
+  extends RelOptRule(
+    operand(classOf[FlinkLogicalTableFunctionScan], any),
+    "BatchExecTableFunctionScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val scan: FlinkLogicalTableFunctionScan = call.rel(0)
+    RexUtil.isConstant(scan.getCall) && scan.getInputs.isEmpty
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val scan: FlinkLogicalTableFunctionScan = call.rel(0)
+
+    // create correlate left
+    val cluster = scan.getCluster
+    val traitSet = call.getPlanner.emptyTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+    val values = new BatchExecValues(
+      cluster,
+      traitSet,
+      ImmutableList.of(ImmutableList.of[RexLiteral]()),
+      cluster.getTypeFactory.createStructType(ImmutableList.of(), ImmutableList.of()))
+
+    val correlate = new BatchExecCorrelate(
+      cluster,
+      traitSet,
+      values,
+      scan,
+      None,
+      None,
+      scan.getRowType,
+      JoinRelType.INNER)
+    call.transformTo(correlate)
+  }
+
+}
+
+object BatchExecConstantTableFunctionScanRule {
+  val INSTANCE = new BatchExecConstantTableFunctionScanRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala
new file mode 100644
index 0000000..711a46a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.nodes.physical.stream.{StreamExecCorrelate, StreamExecValues}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexLiteral, RexUtil}
+
+/**
+  * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to
+  * {{{
+  *                    [[StreamExecCorrelate]]
+  *                          /       \
+  * empty [[StreamExecValues]]  [[FlinkLogicalTableFunctionScan]]
+  * }}}
+  *
+  * Add the rule to support select from a UDF directly, such as the following SQL:
+  * SELECT * FROM LATERAL TABLE(func()) as T(c)
+  *
+  * Note: [[StreamExecCorrelateRule]] is responsible for converting a reasonable physical plan for
+  * the normal correlate query, such as the following SQL:
+  * example1: SELECT * FROM T, LATERAL TABLE(func()) as T(c)
+  * example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)
+  */
+class StreamExecConstantTableFunctionScanRule
+  extends RelOptRule(
+    operand(classOf[FlinkLogicalTableFunctionScan], any),
+    "StreamExecConstantTableFunctionScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val scan: FlinkLogicalTableFunctionScan = call.rel(0)
+    RexUtil.isConstant(scan.getCall) && scan.getInputs.isEmpty
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val scan: FlinkLogicalTableFunctionScan = call.rel(0)
+
+    // create correlate left
+    val cluster = scan.getCluster
+    val traitSet = call.getPlanner.emptyTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+    val values = new StreamExecValues(
+      cluster,
+      traitSet,
+      ImmutableList.of(ImmutableList.of[RexLiteral]()),
+      cluster.getTypeFactory.createStructType(ImmutableList.of(), ImmutableList.of()))
+
+    val correlate = new StreamExecCorrelate(
+      cluster,
+      traitSet,
+      values,
+      None,
+      scan,
+      None,
+      scan.getRowType,
+      JoinRelType.INNER)
+    call.transformTo(correlate)
+  }
+
+}
+
+object StreamExecConstantTableFunctionScanRule {
+  val INSTANCE = new StreamExecConstantTableFunctionScanRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index 250cca5..61ad420 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -24,11 +24,10 @@ import org.apache.flink.table.api.{DataTypes, Table, ValidationException}
 import org.apache.flink.table.expressions.utils.{Func1, Func18, FuncWithOpen, RichFunc2}
 import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc0
 import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, BatchTestBase, CollectionBatchExecTable, UserDefinedFunctionTestUtils}
-import org.apache.flink.table.util.DateTimeTestUtil._
 import org.apache.flink.table.util._
 import org.apache.flink.test.util.TestBaseUtils
 
-import org.junit.{Assert, Ignore, Test}
+import org.junit.{Assert, Test}
 
 import java.sql.{Date, Timestamp}
 
@@ -289,8 +288,6 @@ class CorrelateITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO
-  @Ignore("Add a rule to translate a Correlate without correlateSets to Join!")
   @Test
   def testTableFunctionWithVariableArguments(): Unit = {
     val varArgsFunc0 = new VarArgsFunc0
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/CorrelateITCase.scala
index ee9d795..bf18cb9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/CorrelateITCase.scala
@@ -85,6 +85,30 @@ class CorrelateITCase extends StreamingTestBase {
   }
 
   @Test
+  def testConstantTableFunc(): Unit = {
+    tEnv.registerFunction("str_split", new StringSplit())
+    val query = "SELECT * FROM LATERAL TABLE(str_split()) as T0(d)"
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List("a", "b", "c")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testConstantTableFunc2(): Unit = {
+    tEnv.registerFunction("str_split", new StringSplit())
+    val query = "SELECT * FROM LATERAL TABLE(str_split('Jack,John', ',')) as T0(d)"
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List("Jack", "John")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
   def testUdfIsOpenedAfterUdtf(): Unit = {
     val data = List(
       (1, 2, "abc-bcd"),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
index a42e255..98ee243 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -231,6 +231,37 @@ class CorrelateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
       "nosharp,2",
       "nosharp,nosharp")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
+
+    val result1 = testData(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .select('c)
+      .joinLateral(varArgsFunc0("1", "2"))
+
+    val sink1 = new TestingAppendSink
+    result1.toAppendStream[Row].addSink(sink1)
+    env.execute()
+
+    val expected1 = mutable.MutableList(
+      "Anna#44,1",
+      "Anna#44,2",
+      "Jack#22,1",
+      "Jack#22,2",
+      "John#19,1",
+      "John#19,2",
+      "nosharp,1",
+      "nosharp,2")
+    assertEquals(expected1.sorted, sink1.getAppendResults.sorted)
+
+    // Test for empty cases
+    val result2 = testData(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .select('c)
+      .joinLateral(varArgsFunc0())
+
+    val sink2 = new TestingAppendSink
+    result2.toAppendStream[Row].addSink(sink2)
+    env.execute()
+    assertTrue(sink2.getAppendResults.isEmpty)
   }
 
   @Test