You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/19 14:00:54 UTC
[flink] branch master updated: [FLINK-13321][table-planner-blink]
Fix lateral join udtf with constant parameters doesn't work in blink
planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 869a807 [FLINK-13321][table-planner-blink] Fix lateral join udtf with constant parameters doesn't work in blink planner
869a807 is described below
commit 869a8072869e8fc4e83f0e376e901a97940b4b82
Author: beyond1920 <be...@126.com>
AuthorDate: Thu Jul 18 19:38:28 2019 +0800
[FLINK-13321][table-planner-blink] Fix lateral join udtf with constant parameters doesn't work in blink planner
This closes #9162
---
.../logical/FlinkLogicalTableFunctionScan.scala | 50 +------------
.../table/plan/rules/FlinkBatchRuleSets.scala | 1 +
.../table/plan/rules/FlinkStreamRuleSets.scala | 1 +
.../BatchExecConstantTableFunctionScanRule.scala | 85 ++++++++++++++++++++++
.../StreamExecConstantTableFunctionScanRule.scala | 85 ++++++++++++++++++++++
.../runtime/batch/table/CorrelateITCase.scala | 5 +-
.../table/runtime/stream/sql/CorrelateITCase.scala | 24 ++++++
.../runtime/stream/table/CorrelateITCase.scala | 31 ++++++++
8 files changed, 230 insertions(+), 52 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index 41074bf..b762779 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -22,16 +22,14 @@ import org.apache.flink.table.functions.TemporalTableFunction
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.FlinkConventions
-import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.{JoinRelType, TableFunctionScan}
+import org.apache.calcite.rel.core.TableFunctionScan
import org.apache.calcite.rel.logical.LogicalTableFunctionScan
import org.apache.calcite.rel.metadata.RelColumnMapping
-import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode, RexUtil}
-import org.apache.calcite.util.ImmutableBitSet
+import org.apache.calcite.rex.{RexCall, RexNode}
import java.lang.reflect.Type
import java.util
@@ -109,17 +107,6 @@ class FlinkLogicalTableFunctionScanConverter
val scan = rel.asInstanceOf[LogicalTableFunctionScan]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
- val constantTableFunction = RexUtil.isConstant(scan.getCall) && scan.getInputs.isEmpty
- if (constantTableFunction) {
- convertConstantFunctionTableScan(scan, traitSet)
- } else {
- createFlinkLogicalTableScan(scan, traitSet)
- }
- }
-
- def createFlinkLogicalTableScan(
- scan: LogicalTableFunctionScan,
- traitSet: RelTraitSet): FlinkLogicalTableFunctionScan = {
new FlinkLogicalTableFunctionScan(
scan.getCluster,
traitSet,
@@ -131,39 +118,6 @@ class FlinkLogicalTableFunctionScanConverter
)
}
- /**
- * Converts [[LogicalTableFunctionScan]] with constant RexCall to
- * {{{
- * [[FlinkLogicalCorrelate]]
- * / \
- * empty [[FlinkLogicalValues]] [[FlinkLogicalTableFunctionScan]]
- * }}}
- */
- def convertConstantFunctionTableScan(
- scan: LogicalTableFunctionScan,
- traitSet: RelTraitSet): RelNode = {
- val cluster = scan.getCluster
-
- // create correlate left
- val values = new FlinkLogicalValues(
- cluster,
- traitSet,
- cluster.getTypeFactory.createStructType(ImmutableList.of(), ImmutableList.of()),
- ImmutableList.of(ImmutableList.of[RexLiteral]())
- )
-
- // create correlate right
- val newScan = createFlinkLogicalTableScan(scan, traitSet)
-
- new FlinkLogicalCorrelate(
- cluster,
- traitSet,
- values,
- newScan,
- cluster.createCorrel(), // a dummy CorrelationId
- ImmutableBitSet.of(),
- JoinRelType.INNER)
- }
}
object FlinkLogicalTableFunctionScan {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index 88ddb3d..6538ca6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -396,6 +396,7 @@ object FlinkBatchRuleSets {
BatchExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
BatchExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
// correlate
+ BatchExecConstantTableFunctionScanRule.INSTANCE,
BatchExecCorrelateRule.INSTANCE,
// sink
BatchExecSinkRule.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index d095f76..31b27ef 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -378,6 +378,7 @@ object FlinkStreamRuleSets {
// CEP
StreamExecMatchRule.INSTANCE,
// correlate
+ StreamExecConstantTableFunctionScanRule.INSTANCE,
StreamExecCorrelateRule.INSTANCE,
// sink
StreamExecSinkRule.INSTANCE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala
new file mode 100644
index 0000000..808affa
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.batch
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.nodes.physical.batch.{BatchExecCorrelate, BatchExecValues}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexLiteral, RexUtil}
+
+/**
+ * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to
+ * {{{
+ * [[BatchExecCorrelate]]
+ * / \
+ * empty [[BatchExecValues]] [[FlinkLogicalTableFunctionScan]]
+ * }}}
+ *
+ * Add the rule to support select from a UDF directly, such as the following SQL:
+ * SELECT * FROM LATERAL TABLE(func()) as T(c)
+ *
+ * Note: [[BatchExecCorrelateRule]] is responsible for converting a reasonable physical plan for
+ * the normal correlate query, such as the following SQL:
+ * example1: SELECT * FROM T, LATERAL TABLE(func()) as T(c)
+ * example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)
+ */
+class BatchExecConstantTableFunctionScanRule
+ extends RelOptRule(
+ operand(classOf[FlinkLogicalTableFunctionScan], any),
+ "BatchExecTableFunctionScanRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val scan: FlinkLogicalTableFunctionScan = call.rel(0)
+ RexUtil.isConstant(scan.getCall) && scan.getInputs.isEmpty
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val scan: FlinkLogicalTableFunctionScan = call.rel(0)
+
+ // create correlate left
+ val cluster = scan.getCluster
+ val traitSet = call.getPlanner.emptyTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+ val values = new BatchExecValues(
+ cluster,
+ traitSet,
+ ImmutableList.of(ImmutableList.of[RexLiteral]()),
+ cluster.getTypeFactory.createStructType(ImmutableList.of(), ImmutableList.of()))
+
+ val correlate = new BatchExecCorrelate(
+ cluster,
+ traitSet,
+ values,
+ scan,
+ None,
+ None,
+ scan.getRowType,
+ JoinRelType.INNER)
+ call.transformTo(correlate)
+ }
+
+}
+
+object BatchExecConstantTableFunctionScanRule {
+ val INSTANCE = new BatchExecConstantTableFunctionScanRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala
new file mode 100644
index 0000000..711a46a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.nodes.physical.stream.{StreamExecCorrelate, StreamExecValues}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexLiteral, RexUtil}
+
+/**
+ * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to
+ * {{{
+ * [[StreamExecCorrelate]]
+ * / \
+ * empty [[StreamExecValues]] [[FlinkLogicalTableFunctionScan]]
+ * }}}
+ *
+ * Add the rule to support select from a UDF directly, such as the following SQL:
+ * SELECT * FROM LATERAL TABLE(func()) as T(c)
+ *
+ * Note: [[StreamExecCorrelateRule]] is responsible for converting a reasonable physical plan for
+ * the normal correlate query, such as the following SQL:
+ * example1: SELECT * FROM T, LATERAL TABLE(func()) as T(c)
+ * example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)
+ */
+class StreamExecConstantTableFunctionScanRule
+ extends RelOptRule(
+ operand(classOf[FlinkLogicalTableFunctionScan], any),
+ "StreamExecConstantTableFunctionScanRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val scan: FlinkLogicalTableFunctionScan = call.rel(0)
+ RexUtil.isConstant(scan.getCall) && scan.getInputs.isEmpty
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val scan: FlinkLogicalTableFunctionScan = call.rel(0)
+
+ // create correlate left
+ val cluster = scan.getCluster
+ val traitSet = call.getPlanner.emptyTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+ val values = new StreamExecValues(
+ cluster,
+ traitSet,
+ ImmutableList.of(ImmutableList.of[RexLiteral]()),
+ cluster.getTypeFactory.createStructType(ImmutableList.of(), ImmutableList.of()))
+
+ val correlate = new StreamExecCorrelate(
+ cluster,
+ traitSet,
+ values,
+ None,
+ scan,
+ None,
+ scan.getRowType,
+ JoinRelType.INNER)
+ call.transformTo(correlate)
+ }
+
+}
+
+object StreamExecConstantTableFunctionScanRule {
+ val INSTANCE = new StreamExecConstantTableFunctionScanRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index 250cca5..61ad420 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -24,11 +24,10 @@ import org.apache.flink.table.api.{DataTypes, Table, ValidationException}
import org.apache.flink.table.expressions.utils.{Func1, Func18, FuncWithOpen, RichFunc2}
import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc0
import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, BatchTestBase, CollectionBatchExecTable, UserDefinedFunctionTestUtils}
-import org.apache.flink.table.util.DateTimeTestUtil._
import org.apache.flink.table.util._
import org.apache.flink.test.util.TestBaseUtils
-import org.junit.{Assert, Ignore, Test}
+import org.junit.{Assert, Test}
import java.sql.{Date, Timestamp}
@@ -289,8 +288,6 @@ class CorrelateITCase extends BatchTestBase {
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- // TODO
- @Ignore("Add a rule to translate a Correlate without correlateSets to Join!")
@Test
def testTableFunctionWithVariableArguments(): Unit = {
val varArgsFunc0 = new VarArgsFunc0
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/CorrelateITCase.scala
index ee9d795..bf18cb9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/CorrelateITCase.scala
@@ -85,6 +85,30 @@ class CorrelateITCase extends StreamingTestBase {
}
@Test
+ def testConstantTableFunc(): Unit = {
+ tEnv.registerFunction("str_split", new StringSplit())
+ val query = "SELECT * FROM LATERAL TABLE(str_split()) as T0(d)"
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List("a", "b", "c")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testConstantTableFunc2(): Unit = {
+ tEnv.registerFunction("str_split", new StringSplit())
+ val query = "SELECT * FROM LATERAL TABLE(str_split('Jack,John', ',')) as T0(d)"
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List("Jack", "John")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
def testUdfIsOpenedAfterUdtf(): Unit = {
val data = List(
(1, 2, "abc-bcd"),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
index a42e255..98ee243 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -231,6 +231,37 @@ class CorrelateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
"nosharp,2",
"nosharp,nosharp")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
+
+ val result1 = testData(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .select('c)
+ .joinLateral(varArgsFunc0("1", "2"))
+
+ val sink1 = new TestingAppendSink
+ result1.toAppendStream[Row].addSink(sink1)
+ env.execute()
+
+ val expected1 = mutable.MutableList(
+ "Anna#44,1",
+ "Anna#44,2",
+ "Jack#22,1",
+ "Jack#22,2",
+ "John#19,1",
+ "John#19,2",
+ "nosharp,1",
+ "nosharp,2")
+ assertEquals(expected1.sorted, sink1.getAppendResults.sorted)
+
+ // Test for empty cases
+ val result2 = testData(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .select('c)
+ .joinLateral(varArgsFunc0())
+
+ val sink2 = new TestingAppendSink
+ result2.toAppendStream[Row].addSink(sink2)
+ env.execute()
+ assertTrue(sink2.getAppendResults.isEmpty)
}
@Test