You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:41 UTC
[flink] 08/11: [hotfix][table] Simplify NonWindowJoin class
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 21082a2c349ffc15df064e6250340cdcb748c253
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jul 19 16:55:47 2018 +0200
[hotfix][table] Simplify NonWindowJoin class
---
.../DataStreamJoinToCoProcessTranslator.scala | 5 --
.../table/runtime/join/NonWindowFullJoin.scala | 3 -
.../NonWindowFullJoinWithNonEquiPredicates.scala | 3 -
.../table/runtime/join/NonWindowInnerJoin.scala | 3 -
.../flink/table/runtime/join/NonWindowJoin.scala | 2 -
.../runtime/join/NonWindowLeftRightJoin.scala | 3 -
...nWindowLeftRightJoinWithNonEquiPredicates.scala | 2 -
.../table/runtime/join/NonWindowOuterJoin.scala | 8 +--
.../NonWindowOuterJoinWithNonEquiPredicates.scala | 8 +--
.../table/runtime/harness/JoinHarnessTest.scala | 64 ----------------------
10 files changed, 6 insertions(+), 95 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
index 5a8d1a4..054476a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
@@ -111,7 +111,6 @@ class DataStreamJoinToCoProcessTranslator(
new NonWindowInnerJoin(
leftSchema.typeInfo,
rightSchema.typeInfo,
- CRowTypeInfo(returnType),
genFunction.name,
genFunction.code,
queryConfig)
@@ -119,7 +118,6 @@ class DataStreamJoinToCoProcessTranslator(
new NonWindowLeftRightJoin(
leftSchema.typeInfo,
rightSchema.typeInfo,
- CRowTypeInfo(returnType),
genFunction.name,
genFunction.code,
joinType == JoinRelType.LEFT,
@@ -128,7 +126,6 @@ class DataStreamJoinToCoProcessTranslator(
new NonWindowLeftRightJoinWithNonEquiPredicates(
leftSchema.typeInfo,
rightSchema.typeInfo,
- CRowTypeInfo(returnType),
genFunction.name,
genFunction.code,
joinType == JoinRelType.LEFT,
@@ -137,7 +134,6 @@ class DataStreamJoinToCoProcessTranslator(
new NonWindowFullJoin(
leftSchema.typeInfo,
rightSchema.typeInfo,
- CRowTypeInfo(returnType),
genFunction.name,
genFunction.code,
queryConfig)
@@ -145,7 +141,6 @@ class DataStreamJoinToCoProcessTranslator(
new NonWindowFullJoinWithNonEquiPredicates(
leftSchema.typeInfo,
rightSchema.typeInfo,
- CRowTypeInfo(returnType),
genFunction.name,
genFunction.code,
queryConfig)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
index d2bcb6a..57c60f1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala
@@ -33,7 +33,6 @@ import org.apache.flink.util.Collector
*
* @param leftType the input type of left stream
* @param rightType the input type of right stream
- * @param resultType the output type of join
* @param genJoinFuncName the function code without any non-equi condition
* @param genJoinFuncCode the function name without any non-equi condition
* @param queryConfig the configuration for the query to generate
@@ -41,14 +40,12 @@ import org.apache.flink.util.Collector
class NonWindowFullJoin(
leftType: TypeInformation[Row],
rightType: TypeInformation[Row],
- resultType: TypeInformation[CRow],
genJoinFuncName: String,
genJoinFuncCode: String,
queryConfig: StreamQueryConfig)
extends NonWindowOuterJoin(
leftType,
rightType,
- resultType,
genJoinFuncName,
genJoinFuncCode,
false,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala
index b442a88..9c27eb4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala
@@ -35,7 +35,6 @@ import org.apache.flink.util.Collector
*
* @param leftType the input type of left stream
* @param rightType the input type of right stream
- * @param resultType the output type of join
* @param genJoinFuncName the function code of other non-equi condition
* @param genJoinFuncCode the function name of other non-equi condition
* @param queryConfig the configuration for the query to generate
@@ -43,14 +42,12 @@ import org.apache.flink.util.Collector
class NonWindowFullJoinWithNonEquiPredicates(
leftType: TypeInformation[Row],
rightType: TypeInformation[Row],
- resultType: TypeInformation[CRow],
genJoinFuncName: String,
genJoinFuncCode: String,
queryConfig: StreamQueryConfig)
extends NonWindowOuterJoinWithNonEquiPredicates(
leftType,
rightType,
- resultType,
genJoinFuncName,
genJoinFuncCode,
false,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
index e511ed1..2e5832c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
@@ -32,7 +32,6 @@ import org.apache.flink.util.Collector
*
* @param leftType the input type of left stream
* @param rightType the input type of right stream
- * @param resultType the output type of join
* @param genJoinFuncName the function code of other non-equi condition
* @param genJoinFuncCode the function name of other non-equi condition
* @param queryConfig the configuration for the query to generate
@@ -40,14 +39,12 @@ import org.apache.flink.util.Collector
class NonWindowInnerJoin(
leftType: TypeInformation[Row],
rightType: TypeInformation[Row],
- resultType: TypeInformation[CRow],
genJoinFuncName: String,
genJoinFuncCode: String,
queryConfig: StreamQueryConfig)
extends NonWindowJoin(
leftType,
rightType,
- resultType,
genJoinFuncName,
genJoinFuncCode,
queryConfig) {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
index 51db755..0fe2e39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
@@ -37,7 +37,6 @@ import org.apache.flink.util.Collector
*
* @param leftType the input type of left stream
* @param rightType the input type of right stream
- * @param resultType the output type of join
* @param genJoinFuncName the function code of other non-equi condition
* @param genJoinFuncCode the function name of other non-equi condition
* @param queryConfig the configuration for the query to generate
@@ -45,7 +44,6 @@ import org.apache.flink.util.Collector
abstract class NonWindowJoin(
leftType: TypeInformation[Row],
rightType: TypeInformation[Row],
- resultType: TypeInformation[CRow],
genJoinFuncName: String,
genJoinFuncCode: String,
queryConfig: StreamQueryConfig)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
index a595712..b4f14e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
@@ -33,7 +33,6 @@ import org.apache.flink.util.Collector
*
* @param leftType the input type of left stream
* @param rightType the input type of right stream
- * @param resultType the output type of join
* @param genJoinFuncName the function code without any non-equi condition
* @param genJoinFuncCode the function name without any non-equi condition
* @param isLeftJoin the type of join, whether it is the type of left join
@@ -42,7 +41,6 @@ import org.apache.flink.util.Collector
class NonWindowLeftRightJoin(
leftType: TypeInformation[Row],
rightType: TypeInformation[Row],
- resultType: TypeInformation[CRow],
genJoinFuncName: String,
genJoinFuncCode: String,
isLeftJoin: Boolean,
@@ -50,7 +48,6 @@ class NonWindowLeftRightJoin(
extends NonWindowOuterJoin(
leftType,
rightType,
- resultType,
genJoinFuncName,
genJoinFuncCode,
isLeftJoin,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala
index f3a499a..33517cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala
@@ -44,7 +44,6 @@ import org.apache.flink.util.Collector
class NonWindowLeftRightJoinWithNonEquiPredicates(
leftType: TypeInformation[Row],
rightType: TypeInformation[Row],
- resultType: TypeInformation[CRow],
genJoinFuncName: String,
genJoinFuncCode: String,
isLeftJoin: Boolean,
@@ -52,7 +51,6 @@ class NonWindowLeftRightJoinWithNonEquiPredicates(
extends NonWindowOuterJoinWithNonEquiPredicates(
leftType,
rightType,
- resultType,
genJoinFuncName,
genJoinFuncCode,
isLeftJoin,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
index 8877b89..0018a16 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
@@ -31,7 +31,6 @@ import org.apache.flink.util.Collector
*
* @param leftType the input type of left stream
* @param rightType the input type of right stream
- * @param resultType the output type of join
* @param genJoinFuncName the function code of other non-equi condition
* @param genJoinFuncCode the function name of other non-equi condition
* @param isLeftJoin the type of join, whether it is the type of left join
@@ -40,7 +39,6 @@ import org.apache.flink.util.Collector
abstract class NonWindowOuterJoin(
leftType: TypeInformation[Row],
rightType: TypeInformation[Row],
- resultType: TypeInformation[CRow],
genJoinFuncName: String,
genJoinFuncCode: String,
isLeftJoin: Boolean,
@@ -48,7 +46,6 @@ abstract class NonWindowOuterJoin(
extends NonWindowJoin(
leftType,
rightType,
- resultType,
genJoinFuncName,
genJoinFuncCode,
queryConfig) {
@@ -60,8 +57,9 @@ abstract class NonWindowOuterJoin(
override def open(parameters: Configuration): Unit = {
super.open(parameters)
- leftResultRow = new Row(resultType.getArity)
- rightResultRow = new Row(resultType.getArity)
+ val arity = leftType.getArity + rightType.getArity
+ leftResultRow = new Row(arity)
+ rightResultRow = new Row(arity)
LOG.debug(s"Instantiating NonWindowOuterJoin")
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
index 6812a06..8fe2f4f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala
@@ -32,7 +32,6 @@ import org.apache.flink.types.Row
*
* @param leftType the input type of left stream
* @param rightType the input type of right stream
- * @param resultType the output type of join
* @param genJoinFuncName the function code of other non-equi condition
* @param genJoinFuncCode the function name of other non-equi condition
* @param isLeftJoin the type of join, whether it is the type of left join
@@ -41,7 +40,6 @@ import org.apache.flink.types.Row
abstract class NonWindowOuterJoinWithNonEquiPredicates(
leftType: TypeInformation[Row],
rightType: TypeInformation[Row],
- resultType: TypeInformation[CRow],
genJoinFuncName: String,
genJoinFuncCode: String,
isLeftJoin: Boolean,
@@ -49,7 +47,6 @@ import org.apache.flink.types.Row
extends NonWindowOuterJoin(
leftType,
rightType,
- resultType,
genJoinFuncName,
genJoinFuncCode,
isLeftJoin,
@@ -64,8 +61,9 @@ import org.apache.flink.types.Row
override def open(parameters: Configuration): Unit = {
super.open(parameters)
- leftResultRow = new Row(resultType.getArity)
- rightResultRow = new Row(resultType.getArity)
+ val arity = leftType.getArity + rightType.getArity
+ leftResultRow = new Row(arity)
+ rightResultRow = new Row(arity)
joinCntState = new Array[MapState[Row, Long]](2)
val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index 132ac4e..c499a9d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -861,7 +861,6 @@ class JoinHarnessTest extends HarnessTestBase {
val joinProcessFunc = new NonWindowInnerJoin(
rowType,
rowType,
- joinReturnType,
"TestJoinFunction",
funcCode,
queryConfig)
@@ -953,18 +952,9 @@ class JoinHarnessTest extends HarnessTestBase {
@Test
def testNonWindowInnerJoinWithRetract() {
- val joinReturnType = CRowTypeInfo(new RowTypeInfo(
- Array[TypeInformation[_]](
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- INT_TYPE_INFO,
- STRING_TYPE_INFO),
- Array("a", "b", "c", "d")))
-
val joinProcessFunc = new NonWindowInnerJoin(
rowType,
rowType,
- joinReturnType,
"TestJoinFunction",
funcCode,
queryConfig)
@@ -1053,18 +1043,9 @@ class JoinHarnessTest extends HarnessTestBase {
@Test
def testNonWindowLeftJoinWithoutNonEqualPred() {
- val joinReturnType = CRowTypeInfo(new RowTypeInfo(
- Array[TypeInformation[_]](
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- INT_TYPE_INFO,
- STRING_TYPE_INFO),
- Array("a", "b", "c", "d")))
-
val joinProcessFunc = new NonWindowLeftRightJoin(
rowType,
rowType,
- joinReturnType,
"TestJoinFunction",
funcCode,
true,
@@ -1170,18 +1151,9 @@ class JoinHarnessTest extends HarnessTestBase {
@Test
def testNonWindowLeftJoinWithNonEqualPred() {
- val joinReturnType = CRowTypeInfo(new RowTypeInfo(
- Array[TypeInformation[_]](
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- INT_TYPE_INFO,
- STRING_TYPE_INFO),
- Array("a", "b", "c", "d")))
-
val joinProcessFunc = new NonWindowLeftRightJoinWithNonEquiPredicates(
rowType,
rowType,
- joinReturnType,
"TestJoinFunction",
funcCodeWithNonEqualPred,
true,
@@ -1309,18 +1281,9 @@ class JoinHarnessTest extends HarnessTestBase {
@Test
def testNonWindowRightJoinWithoutNonEqualPred() {
- val joinReturnType = CRowTypeInfo(new RowTypeInfo(
- Array[TypeInformation[_]](
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- INT_TYPE_INFO,
- STRING_TYPE_INFO),
- Array("a", "b", "c", "d")))
-
val joinProcessFunc = new NonWindowLeftRightJoin(
rowType,
rowType,
- joinReturnType,
"TestJoinFunction",
funcCode,
false,
@@ -1426,18 +1389,9 @@ class JoinHarnessTest extends HarnessTestBase {
@Test
def testNonWindowRightJoinWithNonEqualPred() {
- val joinReturnType = CRowTypeInfo(new RowTypeInfo(
- Array[TypeInformation[_]](
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- INT_TYPE_INFO,
- STRING_TYPE_INFO),
- Array("a", "b", "c", "d")))
-
val joinProcessFunc = new NonWindowLeftRightJoinWithNonEquiPredicates(
rowType,
rowType,
- joinReturnType,
"TestJoinFunction",
funcCodeWithNonEqualPred2,
false,
@@ -1565,18 +1519,9 @@ class JoinHarnessTest extends HarnessTestBase {
@Test
def testNonWindowFullJoinWithoutNonEqualPred() {
- val joinReturnType = CRowTypeInfo(new RowTypeInfo(
- Array[TypeInformation[_]](
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- INT_TYPE_INFO,
- STRING_TYPE_INFO),
- Array("a", "b", "c", "d")))
-
val joinProcessFunc = new NonWindowFullJoin(
rowType,
rowType,
- joinReturnType,
"TestJoinFunction",
funcCode,
queryConfig)
@@ -1743,18 +1688,9 @@ class JoinHarnessTest extends HarnessTestBase {
@Test
def testNonWindowFullJoinWithNonEqualPred() {
- val joinReturnType = CRowTypeInfo(new RowTypeInfo(
- Array[TypeInformation[_]](
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- INT_TYPE_INFO,
- STRING_TYPE_INFO),
- Array("a", "b", "c", "d")))
-
val joinProcessFunc = new NonWindowFullJoinWithNonEquiPredicates(
rowType,
rowType,
- joinReturnType,
"TestJoinFunction",
funcCodeWithNonEqualPred2,
queryConfig)