You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/01 03:30:56 UTC

[flink] 01/02: [FLINK-13347][table-planner] should handle SEMI/ANTI JoinRelType in switch case

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 50c957adcff228f1390faa6823a72d93cf0cf92e
Author: godfreyhe <go...@163.com>
AuthorDate: Thu Jul 25 17:06:50 2019 +0800

    [FLINK-13347][table-planner] should handle SEMI/ANTI JoinRelType in switch case
    
    This closes #9227.
---
 .../scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala | 2 +-
 .../main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala | 2 ++
 .../org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala   | 1 +
 .../nodes/datastream/DataStreamJoinToCoProcessTranslator.scala    | 8 +++++---
 .../flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala  | 2 ++
 .../org/apache/flink/table/runtime/join/WindowJoinUtil.scala      | 4 ++--
 6 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index bd2e8fa..923307c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -100,7 +100,7 @@ trait CommonCorrelate {
            |}
            |""".stripMargin
     } else if (joinType != JoinRelType.INNER) {
-      throw new TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+      throw new TableException(s"Unsupported JoinRelType: $joinType for correlate join.")
     }
 
     functionGenerator.generateFunction(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
index 3d98a4d..753ec40 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
@@ -45,6 +45,8 @@ trait CommonJoin {
       case JoinRelType.LEFT=> "LeftOuterJoin"
       case JoinRelType.RIGHT => "RightOuterJoin"
       case JoinRelType.FULL => "FullOuterJoin"
+      case JoinRelType.SEMI => "SemiJoin"
+      case JoinRelType.ANTI => "AntiJoin"
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index 1df75e6..2e319f4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -199,6 +199,7 @@ class DataSetJoin(
           rightKeys.toArray,
           returnType,
           config)
+      case _ => throw new TableException(s"$joinType is not supported.")
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
index 846e452..8c418a3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
@@ -18,13 +18,11 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
-import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
 import org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator
-import org.apache.flink.table.api.{StreamQueryConfig, TableConfig}
+import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, ValidationException}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.CRowKeySelector
@@ -32,6 +30,9 @@ import org.apache.flink.table.runtime.join._
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+
 class DataStreamJoinToCoProcessTranslator(
     config: TableConfig,
     returnType: TypeInformation[Row],
@@ -145,6 +146,7 @@ class DataStreamJoinToCoProcessTranslator(
           genFunction.name,
           genFunction.code,
           queryConfig)
+      case _ => throw new ValidationException(s"$joinType is not supported.")
     }
     new LegacyKeyedCoProcessOperator(joinFunction)
   }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 8c06265..0b34df9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -150,6 +150,7 @@ class DataStreamWindowJoin(
       case JoinRelType.FULL => JoinType.FULL_OUTER
       case JoinRelType.LEFT => JoinType.LEFT_OUTER
       case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+      case _ => throw new TableException(s"$joinType is not supported.")
     }
 
     if (relativeWindowSize < 0) {
@@ -232,6 +233,7 @@ class DataStreamWindowJoin(
       case JoinType.FULL_OUTER =>
         leftDataStream.map(leftPadder).name("Full Outer Join").setParallelism(leftP)
           .union(rightDataStream.map(rightPadder).name("Full Outer Join").setParallelism(rightP))
+      case _ => throw new TableException(s"$joinType is not supported.")
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 3e355e8..84d45f1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -18,7 +18,6 @@
 package org.apache.flink.table.runtime.join
 
 import java.util
-
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
@@ -27,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.{SqlKind, SqlOperatorTable}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
 import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.functions.sql.ProctimeSqlFunction
@@ -444,6 +443,7 @@ object WindowJoinUtil {
       case JoinRelType.LEFT => true
       case JoinRelType.RIGHT => true
       case JoinRelType.FULL => true
+      case _ => throw new TableException(s"$joinType is not supported.")
     }
 
     // generate other non-equi function code