You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:41 UTC

[flink] 08/11: [hotfix][table] Simplify NonWindowJoin class

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21082a2c349ffc15df064e6250340cdcb748c253
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jul 19 16:55:47 2018 +0200

    [hotfix][table] Simplify NonWindowJoin class
---
 .../DataStreamJoinToCoProcessTranslator.scala      |  5 --
 .../table/runtime/join/NonWindowFullJoin.scala     |  3 -
 .../NonWindowFullJoinWithNonEquiPredicates.scala   |  3 -
 .../table/runtime/join/NonWindowInnerJoin.scala    |  3 -
 .../flink/table/runtime/join/NonWindowJoin.scala   |  2 -
 .../runtime/join/NonWindowLeftRightJoin.scala      |  3 -
 ...nWindowLeftRightJoinWithNonEquiPredicates.scala |  2 -
 .../table/runtime/join/NonWindowOuterJoin.scala    |  8 +--
 .../NonWindowOuterJoinWithNonEquiPredicates.scala  |  8 +--
 .../table/runtime/harness/JoinHarnessTest.scala    | 64 ----------------------
 10 files changed, 6 insertions(+), 95 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
index 5a8d1a4..054476a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
@@ -111,7 +111,6 @@ class DataStreamJoinToCoProcessTranslator(
         new NonWindowInnerJoin(
           leftSchema.typeInfo,
           rightSchema.typeInfo,
-          CRowTypeInfo(returnType),
           genFunction.name,
           genFunction.code,
           queryConfig)
@@ -119,7 +118,6 @@ class DataStreamJoinToCoProcessTranslator(
         new NonWindowLeftRightJoin(
           leftSchema.typeInfo,
           rightSchema.typeInfo,
-          CRowTypeInfo(returnType),
           genFunction.name,
           genFunction.code,
           joinType == JoinRelType.LEFT,
@@ -128,7 +126,6 @@ class DataStreamJoinToCoProcessTranslator(
         new NonWindowLeftRightJoinWithNonEquiPredicates(
           leftSchema.typeInfo,
           rightSchema.typeInfo,
-          CRowTypeInfo(returnType),
           genFunction.name,
           genFunction.code,
           joinType == JoinRelType.LEFT,
@@ -137,7 +134,6 @@ class DataStreamJoinToCoProcessTranslator(
         new NonWindowFullJoin(
           leftSchema.typeInfo,
           rightSchema.typeInfo,
-          CRowTypeInfo(returnType),
           genFunction.name,
           genFunction.code,
           queryConfig)
@@ -145,7 +141,6 @@ class DataStreamJoinToCoProcessTranslator(
         new NonWindowFullJoinWithNonEquiPredicates(
           leftSchema.typeInfo,
           rightSchema.typeInfo,
-          CRowTypeInfo(returnType),
           genFunction.name,
           genFunction.code,
           queryConfig)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
index d2bcb6a..57c60f1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
@@ -33,7 +33,6 @@ import org.apache.flink.util.Collector
   *
   * @param leftType        the input type of left stream
   * @param rightType       the input type of right stream
-  * @param resultType      the output type of join
   * @param genJoinFuncName the function code without any non-equi condition
   * @param genJoinFuncCode the function name without any non-equi condition
   * @param queryConfig     the configuration for the query to generate
@@ -41,14 +40,12 @@ import org.apache.flink.util.Collector
 class NonWindowFullJoin(
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
-    resultType: TypeInformation[CRow],
     genJoinFuncName: String,
     genJoinFuncCode: String,
     queryConfig: StreamQueryConfig)
   extends NonWindowOuterJoin(
     leftType,
     rightType,
-    resultType,
     genJoinFuncName,
     genJoinFuncCode,
     false,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala
index b442a88..9c27eb4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala
@@ -35,7 +35,6 @@ import org.apache.flink.util.Collector
   *
   * @param leftType        the input type of left stream
   * @param rightType       the input type of right stream
-  * @param resultType      the output type of join
   * @param genJoinFuncName the function code of other non-equi condition
   * @param genJoinFuncCode the function name of other non-equi condition
   * @param queryConfig     the configuration for the query to generate
@@ -43,14 +42,12 @@ import org.apache.flink.util.Collector
 class NonWindowFullJoinWithNonEquiPredicates(
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
-    resultType: TypeInformation[CRow],
     genJoinFuncName: String,
     genJoinFuncCode: String,
     queryConfig: StreamQueryConfig)
   extends NonWindowOuterJoinWithNonEquiPredicates(
     leftType,
     rightType,
-    resultType,
     genJoinFuncName,
     genJoinFuncCode,
     false,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
index e511ed1..2e5832c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
@@ -32,7 +32,6 @@ import org.apache.flink.util.Collector
   *
   * @param leftType          the input type of left stream
   * @param rightType         the input type of right stream
-  * @param resultType        the output type of join
   * @param genJoinFuncName   the function code of other non-equi condition
   * @param genJoinFuncCode   the function name of other non-equi condition
   * @param queryConfig       the configuration for the query to generate
@@ -40,14 +39,12 @@ import org.apache.flink.util.Collector
 class NonWindowInnerJoin(
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
-    resultType: TypeInformation[CRow],
     genJoinFuncName: String,
     genJoinFuncCode: String,
     queryConfig: StreamQueryConfig)
   extends NonWindowJoin(
     leftType,
     rightType,
-    resultType,
     genJoinFuncName,
     genJoinFuncCode,
     queryConfig) {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
index 51db755..0fe2e39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
@@ -37,7 +37,6 @@ import org.apache.flink.util.Collector
   *
   * @param leftType          the input type of left stream
   * @param rightType         the input type of right stream
-  * @param resultType        the output type of join
   * @param genJoinFuncName   the function code of other non-equi condition
   * @param genJoinFuncCode   the function name of other non-equi condition
   * @param queryConfig       the configuration for the query to generate
@@ -45,7 +44,6 @@ import org.apache.flink.util.Collector
 abstract class NonWindowJoin(
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
-    resultType: TypeInformation[CRow],
     genJoinFuncName: String,
     genJoinFuncCode: String,
     queryConfig: StreamQueryConfig)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
index a595712..b4f14e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
@@ -33,7 +33,6 @@ import org.apache.flink.util.Collector
   *
   * @param leftType        the input type of left stream
   * @param rightType       the input type of right stream
-  * @param resultType      the output type of join
   * @param genJoinFuncName the function code without any non-equi condition
   * @param genJoinFuncCode the function name without any non-equi condition
   * @param isLeftJoin      the type of join, whether it is the type of left join
@@ -42,7 +41,6 @@ import org.apache.flink.util.Collector
 class NonWindowLeftRightJoin(
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
-    resultType: TypeInformation[CRow],
     genJoinFuncName: String,
     genJoinFuncCode: String,
     isLeftJoin: Boolean,
@@ -50,7 +48,6 @@ class NonWindowLeftRightJoin(
   extends NonWindowOuterJoin(
     leftType,
     rightType,
-    resultType,
     genJoinFuncName,
     genJoinFuncCode,
     isLeftJoin,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala
index f3a499a..33517cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala
@@ -44,7 +44,6 @@ import org.apache.flink.util.Collector
 class NonWindowLeftRightJoinWithNonEquiPredicates(
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
-    resultType: TypeInformation[CRow],
     genJoinFuncName: String,
     genJoinFuncCode: String,
     isLeftJoin: Boolean,
@@ -52,7 +51,6 @@ class NonWindowLeftRightJoinWithNonEquiPredicates(
   extends NonWindowOuterJoinWithNonEquiPredicates(
     leftType,
     rightType,
-    resultType,
     genJoinFuncName,
     genJoinFuncCode,
     isLeftJoin,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
index 8877b89..0018a16 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
@@ -31,7 +31,6 @@ import org.apache.flink.util.Collector
   *
   * @param leftType        the input type of left stream
   * @param rightType       the input type of right stream
-  * @param resultType      the output type of join
   * @param genJoinFuncName the function code of other non-equi condition
   * @param genJoinFuncCode the function name of other non-equi condition
   * @param isLeftJoin      the type of join, whether it is the type of left join
@@ -40,7 +39,6 @@ import org.apache.flink.util.Collector
 abstract class NonWindowOuterJoin(
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
-    resultType: TypeInformation[CRow],
     genJoinFuncName: String,
     genJoinFuncCode: String,
     isLeftJoin: Boolean,
@@ -48,7 +46,6 @@ abstract class NonWindowOuterJoin(
   extends NonWindowJoin(
     leftType,
     rightType,
-    resultType,
     genJoinFuncName,
     genJoinFuncCode,
     queryConfig) {
@@ -60,8 +57,9 @@ abstract class NonWindowOuterJoin(
 
   override def open(parameters: Configuration): Unit = {
     super.open(parameters)
-    leftResultRow = new Row(resultType.getArity)
-    rightResultRow = new Row(resultType.getArity)
+    val arity = leftType.getArity + rightType.getArity
+    leftResultRow = new Row(arity)
+    rightResultRow = new Row(arity)
     LOG.debug(s"Instantiating NonWindowOuterJoin")
   }
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
index 6812a06..8fe2f4f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
@@ -32,7 +32,6 @@ import org.apache.flink.types.Row
   *
   * @param leftType        the input type of left stream
   * @param rightType       the input type of right stream
-  * @param resultType      the output type of join
   * @param genJoinFuncName the function code of other non-equi condition
   * @param genJoinFuncCode the function name of other non-equi condition
   * @param isLeftJoin      the type of join, whether it is the type of left join
@@ -41,7 +40,6 @@ import org.apache.flink.types.Row
   abstract class NonWindowOuterJoinWithNonEquiPredicates(
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
-    resultType: TypeInformation[CRow],
     genJoinFuncName: String,
     genJoinFuncCode: String,
     isLeftJoin: Boolean,
@@ -49,7 +47,6 @@ import org.apache.flink.types.Row
   extends NonWindowOuterJoin(
     leftType,
     rightType,
-    resultType,
     genJoinFuncName,
     genJoinFuncCode,
     isLeftJoin,
@@ -64,8 +61,9 @@ import org.apache.flink.types.Row
   override def open(parameters: Configuration): Unit = {
     super.open(parameters)
 
-    leftResultRow = new Row(resultType.getArity)
-    rightResultRow = new Row(resultType.getArity)
+    val arity = leftType.getArity + rightType.getArity
+    leftResultRow = new Row(arity)
+    rightResultRow = new Row(arity)
 
     joinCntState = new Array[MapState[Row, Long]](2)
     val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index 132ac4e..c499a9d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -861,7 +861,6 @@ class JoinHarnessTest extends HarnessTestBase {
     val joinProcessFunc = new NonWindowInnerJoin(
       rowType,
       rowType,
-      joinReturnType,
       "TestJoinFunction",
       funcCode,
       queryConfig)
@@ -953,18 +952,9 @@ class JoinHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindowInnerJoinWithRetract() {
 
-    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
-      Array[TypeInformation[_]](
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO,
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO),
-      Array("a", "b", "c", "d")))
-
     val joinProcessFunc = new NonWindowInnerJoin(
       rowType,
       rowType,
-      joinReturnType,
       "TestJoinFunction",
       funcCode,
       queryConfig)
@@ -1053,18 +1043,9 @@ class JoinHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindowLeftJoinWithoutNonEqualPred() {
 
-    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
-      Array[TypeInformation[_]](
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO,
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO),
-      Array("a", "b", "c", "d")))
-
     val joinProcessFunc = new NonWindowLeftRightJoin(
       rowType,
       rowType,
-      joinReturnType,
       "TestJoinFunction",
       funcCode,
       true,
@@ -1170,18 +1151,9 @@ class JoinHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindowLeftJoinWithNonEqualPred() {
 
-    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
-      Array[TypeInformation[_]](
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO,
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO),
-      Array("a", "b", "c", "d")))
-
     val joinProcessFunc = new NonWindowLeftRightJoinWithNonEquiPredicates(
       rowType,
       rowType,
-      joinReturnType,
       "TestJoinFunction",
       funcCodeWithNonEqualPred,
       true,
@@ -1309,18 +1281,9 @@ class JoinHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindowRightJoinWithoutNonEqualPred() {
 
-    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
-      Array[TypeInformation[_]](
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO,
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO),
-      Array("a", "b", "c", "d")))
-
     val joinProcessFunc = new NonWindowLeftRightJoin(
       rowType,
       rowType,
-      joinReturnType,
       "TestJoinFunction",
       funcCode,
       false,
@@ -1426,18 +1389,9 @@ class JoinHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindowRightJoinWithNonEqualPred() {
 
-    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
-      Array[TypeInformation[_]](
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO,
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO),
-      Array("a", "b", "c", "d")))
-
     val joinProcessFunc = new NonWindowLeftRightJoinWithNonEquiPredicates(
       rowType,
       rowType,
-      joinReturnType,
       "TestJoinFunction",
       funcCodeWithNonEqualPred2,
       false,
@@ -1565,18 +1519,9 @@ class JoinHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindowFullJoinWithoutNonEqualPred() {
 
-    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
-      Array[TypeInformation[_]](
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO,
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO),
-      Array("a", "b", "c", "d")))
-
     val joinProcessFunc = new NonWindowFullJoin(
       rowType,
       rowType,
-      joinReturnType,
       "TestJoinFunction",
       funcCode,
       queryConfig)
@@ -1743,18 +1688,9 @@ class JoinHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindowFullJoinWithNonEqualPred() {
 
-    val joinReturnType = CRowTypeInfo(new RowTypeInfo(
-      Array[TypeInformation[_]](
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO,
-        INT_TYPE_INFO,
-        STRING_TYPE_INFO),
-      Array("a", "b", "c", "d")))
-
     val joinProcessFunc = new NonWindowFullJoinWithNonEquiPredicates(
       rowType,
       rowType,
-      joinReturnType,
       "TestJoinFunction",
       funcCodeWithNonEqualPred2,
       queryConfig)