You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/01 03:30:56 UTC
[flink] 01/02: [FLINK-13347][table-planner] should handle SEMI/ANTI
JoinRelType in switch case
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 50c957adcff228f1390faa6823a72d93cf0cf92e
Author: godfreyhe <go...@163.com>
AuthorDate: Thu Jul 25 17:06:50 2019 +0800
[FLINK-13347][table-planner] should handle SEMI/ANTI JoinRelType in switch case
This closes #9227.
---
.../scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala | 2 +-
.../main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala | 2 ++
.../org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala | 1 +
.../nodes/datastream/DataStreamJoinToCoProcessTranslator.scala | 8 +++++---
.../flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala | 2 ++
.../org/apache/flink/table/runtime/join/WindowJoinUtil.scala | 4 ++--
6 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index bd2e8fa..923307c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -100,7 +100,7 @@ trait CommonCorrelate {
|}
|""".stripMargin
} else if (joinType != JoinRelType.INNER) {
- throw new TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+ throw new TableException(s"Unsupported JoinRelType: $joinType for correlate join.")
}
functionGenerator.generateFunction(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
index 3d98a4d..753ec40 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
@@ -45,6 +45,8 @@ trait CommonJoin {
case JoinRelType.LEFT=> "LeftOuterJoin"
case JoinRelType.RIGHT => "RightOuterJoin"
case JoinRelType.FULL => "FullOuterJoin"
+ case JoinRelType.SEMI => "SemiJoin"
+ case JoinRelType.ANTI => "AntiJoin"
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index 1df75e6..2e319f4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -199,6 +199,7 @@ class DataSetJoin(
rightKeys.toArray,
returnType,
config)
+ case _ => throw new TableException(s"$joinType is not supported.")
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
index 846e452..8c418a3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
@@ -18,13 +18,11 @@
package org.apache.flink.table.plan.nodes.datastream
-import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
-import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
import org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator
-import org.apache.flink.table.api.{StreamQueryConfig, TableConfig}
+import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, ValidationException}
import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.CRowKeySelector
@@ -32,6 +30,9 @@ import org.apache.flink.table.runtime.join._
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+
class DataStreamJoinToCoProcessTranslator(
config: TableConfig,
returnType: TypeInformation[Row],
@@ -145,6 +146,7 @@ class DataStreamJoinToCoProcessTranslator(
genFunction.name,
genFunction.code,
queryConfig)
+ case _ => throw new ValidationException(s"$joinType is not supported.")
}
new LegacyKeyedCoProcessOperator(joinFunction)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 8c06265..0b34df9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -150,6 +150,7 @@ class DataStreamWindowJoin(
case JoinRelType.FULL => JoinType.FULL_OUTER
case JoinRelType.LEFT => JoinType.LEFT_OUTER
case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+ case _ => throw new TableException(s"$joinType is not supported.")
}
if (relativeWindowSize < 0) {
@@ -232,6 +233,7 @@ class DataStreamWindowJoin(
case JoinType.FULL_OUTER =>
leftDataStream.map(leftPadder).name("Full Outer Join").setParallelism(leftP)
.union(rightDataStream.map(rightPadder).name("Full Outer Join").setParallelism(rightP))
+ case _ => throw new TableException(s"$joinType is not supported.")
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 3e355e8..84d45f1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.runtime.join
import java.util
-
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.JoinRelType
@@ -27,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.{SqlKind, SqlOperatorTable}
import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.functions.sql.ProctimeSqlFunction
@@ -444,6 +443,7 @@ object WindowJoinUtil {
case JoinRelType.LEFT => true
case JoinRelType.RIGHT => true
case JoinRelType.FULL => true
+ case _ => throw new TableException(s"$joinType is not supported.")
}
// generate other non-equi function code