You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/08 17:50:52 UTC
[1/5] flink git commit: [FLINK-7971] [table] Fix potential NPE in
non-windowed aggregation.
Repository: flink
Updated Branches:
refs/heads/release-1.4 f5a0b4bdf -> c79432915
[FLINK-7971] [table] Fix potential NPE in non-windowed aggregation.
This closes #4941.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a126bd3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a126bd3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a126bd3e
Branch: refs/heads/release-1.4
Commit: a126bd3e7d9614749f61692fbb53c5b284f17091
Parents: f5a0b4b
Author: Xpray <le...@gmail.com>
Authored: Fri Nov 3 15:19:42 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:43:31 2017 +0100
----------------------------------------------------------------------
.../flink/table/runtime/aggregate/GroupAggProcessFunction.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a126bd3e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 91c379f..3970320 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -97,11 +97,14 @@ class GroupAggProcessFunction(
if (null == accumulators) {
firstRow = true
accumulators = function.createAccumulators()
- inputCnt = 0L
} else {
firstRow = false
}
+ if (null == inputCnt) {
+ inputCnt = 0L
+ }
+
// Set group keys value to the final output
function.setForwardedFields(input, newRow.row)
function.setForwardedFields(input, prevRow.row)
[4/5] flink git commit: [FLINK-8012] [table] Fix TableSink config for
tables with time attributes.
Posted by fh...@apache.org.
[FLINK-8012] [table] Fix TableSink config for tables with time attributes.
This closes #4974.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51657fc6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51657fc6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51657fc6
Branch: refs/heads/release-1.4
Commit: 51657fc6deaf28115020db86d031d536b09bf384
Parents: 1b20f70
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Nov 7 17:57:39 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:43:57 2017 +0100
----------------------------------------------------------------------
.../scala/org/apache/flink/table/api/table.scala | 7 ++++++-
.../table/runtime/stream/table/TableSinkITCase.scala | 15 ++++++++++++---
2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/51657fc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 0430e49..7349a0e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -848,7 +848,12 @@ class Table(
val rowType = getRelNode.getRowType
val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+ .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+ .map {
+ // replace time indicator types by SQL_TIMESTAMP
+ case t: TypeInformation[_] if FlinkTypeFactory.isTimeIndicatorType(t) => Types.SQL_TIMESTAMP
+ case t: TypeInformation[_] => t
+ }.toArray
// configure the table sink
val configuredSink = sink.configure(fieldNames, fieldTypes)
http://git-wip-us.apache.org/repos/asf/flink/blob/51657fc6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 07934b8..b44d8ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -88,13 +88,16 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(4)
val input = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._2)
.map(x => x).setParallelism(4) // increase DOP to 4
- val results = input.toTable(tEnv, 'a, 'b, 'c)
+ val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
.where('a < 5 || 'a > 17)
.select('c, 'b)
.writeToSink(new CsvTableSink(path))
@@ -102,8 +105,14 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
env.execute()
val expected = Seq(
- "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
- "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
+ "Hi,1970-01-01 00:00:00.001",
+ "Hello,1970-01-01 00:00:00.002",
+ "Hello world,1970-01-01 00:00:00.002",
+ "Hello world, how are you?,1970-01-01 00:00:00.003",
+ "Comment#12,1970-01-01 00:00:00.006",
+ "Comment#13,1970-01-01 00:00:00.006",
+ "Comment#14,1970-01-01 00:00:00.006",
+ "Comment#15,1970-01-01 00:00:00.006").mkString("\n")
TestBaseUtils.compareResultsByLinesInMemory(expected, path)
}
[2/5] flink git commit: [FLINK-7922] [table] Fix
FlinkTypeFactory.leastRestrictive for composite types.
Posted by fh...@apache.org.
[FLINK-7922] [table] Fix FlinkTypeFactory.leastRestrictive for composite types.
This closes #4929.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c60f97a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c60f97a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c60f97a
Branch: refs/heads/release-1.4
Commit: 8c60f97a43defc57bb1bfaabdd6081b329db53b8
Parents: a126bd3
Author: Rong Rong <ro...@uber.com>
Authored: Tue Oct 31 11:05:38 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:43:39 2017 +0100
----------------------------------------------------------------------
.../flink/table/calcite/FlinkTypeFactory.scala | 39 +++++----
.../table/api/batch/sql/SetOperatorsTest.scala | 55 +++++++++++++
.../api/batch/table/SetOperatorsTest.scala | 59 ++++++++++++++
.../flink/table/api/stream/sql/UnionTest.scala | 83 ++++++++++++++++++++
.../table/expressions/ScalarOperatorsTest.scala | 5 +-
.../utils/ScalarOperatorsTestBase.scala | 9 ++-
.../stream/table/SetOperatorsITCase.scala | 20 ++++-
.../table/runtime/utils/CommonTestData.scala | 6 ++
.../flink/table/utils/TableTestBase.scala | 26 ++++++
9 files changed, 280 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 2874e61..04fab76 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -268,30 +268,37 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
val type0 = types.get(0)
if (type0.getSqlTypeName != null) {
- val resultType = resolveAny(types)
- if (resultType != null) {
- return resultType
+ val resultType = resolveAllIdenticalTypes(types)
+ if (resultType.isDefined) {
+ // result type for identical types
+ return resultType.get
}
}
+ // fall back to super
super.leastRestrictive(types)
}
- private def resolveAny(types: util.List[RelDataType]): RelDataType = {
+ private def resolveAllIdenticalTypes(types: util.List[RelDataType]): Option[RelDataType] = {
val allTypes = types.asScala
- val hasAny = allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)
- if (hasAny) {
- val head = allTypes.head
- // only allow ANY with exactly the same GenericRelDataType for all types
- if (allTypes.forall(_ == head)) {
- val nullable = allTypes.exists(
- sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
- )
- createTypeWithNullability(head, nullable)
- } else {
+
+ val head = allTypes.head
+ // check if all types are the same
+ if (allTypes.forall(_ == head)) {
+ // types are the same, check nullability
+ val nullable = allTypes
+ .exists(sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL)
+ // return type with nullability
+ Some(createTypeWithNullability(head, nullable))
+ } else {
+ // types are not all the same
+ if (allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)) {
+ // one of the type was ANY.
+ // we cannot generate a common type if it differs from other types.
throw TableException("Generic ANY types must have a common type information.")
+ } else {
+ // cannot resolve a common type for different input types
+ None
}
- } else {
- null
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
index bc9b453..bff0b78 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
@@ -18,8 +18,11 @@
package org.apache.flink.table.api.batch.sql
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.TableTestBase
import org.junit.{Ignore, Test}
@@ -178,4 +181,56 @@ class SetOperatorsTest extends TableTestBase {
expected
)
}
+
+ @Test
+ def testUnionNullableTypes(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "CASE(>(c, 0), b, null) AS EXPR$0")
+ ),
+ term("union", "a")
+ )
+
+ util.verifySql(
+ "SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A",
+ expected
+ )
+ }
+
+ @Test
+ def testUnionAnyType(): Unit = {
+ val util = batchTestUtil()
+ val typeInfo = Types.ROW(
+ new GenericTypeInfo(classOf[NonPojo]),
+ new GenericTypeInfo(classOf[NonPojo]))
+ util.addJavaTable(typeInfo, "A", "a, b")
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "b")
+ ),
+ term("union", "a")
+ )
+
+ util.verifyJavaSql("SELECT a FROM A UNION ALL SELECT b FROM A", expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 686973e..2d4e205 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -20,8 +20,12 @@ package org.apache.flink.table.api.batch.table
import java.sql.Timestamp
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Null
+import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil._
import org.junit.Test
@@ -76,4 +80,59 @@ class SetOperatorsTest extends TableTestBase {
util.verifyTable(in, expected)
}
+
+ @Test
+ def testUnionNullableTypes(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+
+ val in = t.select('a)
+ .unionAll(
+ t.select(('c > 0) ? ('b, Null(createTypeInformation[(Int, String)]))))
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "CASE(>(c, 0), b, null) AS _c0")
+ ),
+ term("union", "a")
+ )
+
+ util.verifyTable(in, expected)
+ }
+
+ @Test
+ def testUnionAnyType(): Unit = {
+ val util = batchTestUtil()
+ val typeInfo = Types.ROW(
+ new GenericTypeInfo(classOf[NonPojo]),
+ new GenericTypeInfo(classOf[NonPojo]))
+ val t = util.addJavaTable(typeInfo, "A", "a, b")
+
+ val in = t.select('a).unionAll(t.select('b))
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "b")
+ ),
+ term("union", "a")
+ )
+
+ util.verifyJavaTable(in, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
new file mode 100644
index 0000000..7e807f6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class UnionTest extends TableTestBase {
+
+ @Test
+ def testUnionAllNullableCompositeType() = {
+ val streamUtil = streamTestUtil()
+ streamUtil.addTable[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "CASE(>(c, 0), b, null) AS EXPR$0")
+ ),
+ term("union all", "a")
+ )
+
+ streamUtil.verifySql(
+ "SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A",
+ expected
+ )
+ }
+
+ @Test
+ def testUnionAnyType(): Unit = {
+ val streamUtil = streamTestUtil()
+ val typeInfo = Types.ROW(
+ new GenericTypeInfo(classOf[NonPojo]),
+ new GenericTypeInfo(classOf[NonPojo]))
+ streamUtil.addJavaTable(typeInfo, "A", "a, b")
+
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "b")
+ ),
+ term("union all", "a")
+ )
+
+ streamUtil.verifyJavaSql("SELECT a FROM A UNION ALL SELECT b FROM A", expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index cbdce8b..6dd2afc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -211,7 +211,6 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
@Test
def testOtherExpressions(): Unit = {
-
// nested field null type
testSqlApi("CASE WHEN f13.f1 IS NULL THEN 'a' ELSE 'b' END", "a")
testSqlApi("CASE WHEN f13.f1 IS NOT NULL THEN 'a' ELSE 'b' END", "b")
@@ -222,6 +221,10 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
testAllApis('f13.get("f1").isNull, "f13.get('f1').isNull", "f13.f1 IS NULL", "true")
testAllApis('f13.get("f1").isNotNull, "f13.get('f1').isNotNull", "f13.f1 IS NOT NULL", "false")
+ // array element access test
+ testSqlApi("CASE WHEN f18 IS NOT NULL THEN f18[1] ELSE NULL END", "1")
+ testSqlApi("CASE WHEN f19 IS NOT NULL THEN f19[1] ELSE NULL END", "(1,a)")
+
// boolean literals
testAllApis(
true,
http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
index 149d8c1..3eeb215 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
@@ -22,6 +22,7 @@ import java.sql.Date
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
@@ -29,7 +30,7 @@ import org.apache.flink.types.Row
class ScalarOperatorsTestBase extends ExpressionTestBase {
def testData: Row = {
- val testData = new Row(18)
+ val testData = new Row(20)
testData.setField(0, 1: Byte)
testData.setField(1, 1: Short)
testData.setField(2, 1)
@@ -48,6 +49,8 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
testData.setField(15, Date.valueOf("1996-11-10"))
testData.setField(16, BigDecimal("0.00000000").bigDecimal)
testData.setField(17, BigDecimal("10.0").bigDecimal)
+ testData.setField(18, Array[Integer](1,2))
+ testData.setField(19, Array[(Int, String)]((1,"a"), (2, "b")))
testData
}
@@ -70,7 +73,9 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
Types.STRING,
Types.SQL_DATE,
Types.DECIMAL,
- Types.DECIMAL
+ Types.DECIMAL,
+ Types.OBJECT_ARRAY(Types.INT),
+ Types.OBJECT_ARRAY(createTypeInformation[(Int, String)])
).asInstanceOf[TypeInformation[Any]]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index cf195a5..5e15e14 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
import org.apache.flink.types.Row
import org.junit.Assert._
@@ -88,9 +89,22 @@ class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
- class NonPojo {
- val x = new java.util.HashMap[String, String]()
+ @Test
+ def testUnionWithCompositeType(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val s1 = env.fromElements((1, (1, "a")), (2, (2, "b")))
+ .toTable(tEnv, 'a, 'b)
+ val s2 = env.fromElements(((3, "c"), 3), ((4, "d"), 4))
+ .toTable(tEnv, 'a, 'b)
- override def toString: String = x.toString
+ val result = s1.unionAll(s2.select('b, 'a)).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList("1,(1,a)", "2,(2,b)", "3,(3,c)", "4,(4,d)")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index e8568ca..9223887 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -176,4 +176,10 @@ object CommonTestData {
this(null, null)
}
}
+
+ class NonPojo {
+ val x = new java.util.HashMap[String, String]()
+
+ override def toString: String = x.toString
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 0a0d12e..4042f50 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -190,6 +190,19 @@ case class BatchTableTestUtil() extends TableTestUtil {
actual.split("\n").map(_.trim).mkString("\n"))
}
+ def verifyJavaSql(query: String, expected: String): Unit = {
+ verifyJavaTable(javaTableEnv.sqlQuery(query), expected)
+ }
+
+ def verifyJavaTable(resultTable: Table, expected: String): Unit = {
+ val relNode = resultTable.getRelNode
+ val optimized = javaTableEnv.optimize(relNode)
+ val actual = RelOptUtil.toString(optimized)
+ assertEquals(
+ expected.split("\n").map(_.trim).mkString("\n"),
+ actual.split("\n").map(_.trim).mkString("\n"))
+ }
+
def printTable(resultTable: Table): Unit = {
val relNode = resultTable.getRelNode
val optimized = tableEnv.optimize(relNode)
@@ -268,6 +281,19 @@ case class StreamTableTestUtil() extends TableTestUtil {
actual.split("\n").map(_.trim).mkString("\n"))
}
+ def verifyJavaSql(query: String, expected: String): Unit = {
+ verifyJavaTable(javaTableEnv.sqlQuery(query), expected)
+ }
+
+ def verifyJavaTable(resultTable: Table, expected: String): Unit = {
+ val relNode = resultTable.getRelNode
+ val optimized = javaTableEnv.optimize(relNode, updatesAsRetraction = false)
+ val actual = RelOptUtil.toString(optimized)
+ assertEquals(
+ expected.split("\n").map(_.trim).mkString("\n"),
+ actual.split("\n").map(_.trim).mkString("\n"))
+ }
+
// the print methods are for debugging purposes only
def printTable(resultTable: Table): Unit = {
val relNode = resultTable.getRelNode
[5/5] flink git commit: [FLINK-8002] [table] Fix join window boundary
for LESS_THAN and GREATER_THAN predicates.
Posted by fh...@apache.org.
[FLINK-8002] [table] Fix join window boundary for LESS_THAN and GREATER_THAN predicates.
This closes #4962.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7943291
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7943291
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7943291
Branch: refs/heads/release-1.4
Commit: c7943291599260003304f003e89725352ae7d836
Parents: 51657fc
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Nov 6 21:22:35 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:44:06 2017 +0100
----------------------------------------------------------------------
.../table/runtime/join/WindowJoinUtil.scala | 8 +++--
.../flink/table/api/stream/sql/JoinTest.scala | 38 ++++++++++++++++++--
2 files changed, 42 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c7943291/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 1693c41..7006476 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -346,10 +346,14 @@ object WindowJoinUtil {
leftLiteral.get - rightLiteral.get
}
val boundary = timePred.pred.getKind match {
- case SqlKind.LESS_THAN =>
+ case SqlKind.LESS_THAN if timePred.leftInputOnLeftSide =>
tmpTimeOffset - 1
- case SqlKind.GREATER_THAN =>
+ case SqlKind.LESS_THAN if !timePred.leftInputOnLeftSide =>
tmpTimeOffset + 1
+ case SqlKind.GREATER_THAN if timePred.leftInputOnLeftSide =>
+ tmpTimeOffset + 1
+ case SqlKind.GREATER_THAN if !timePred.leftInputOnLeftSide =>
+ tmpTimeOffset - 1
case _ =>
tmpTimeOffset
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c7943291/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index ded5c51..8c1865c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -379,14 +379,27 @@ class JoinTest extends TableTestBase {
"rowtime")
verifyTimeBoundary(
- "t1.c - interval '2' second >= t2.c + interval '1' second -" +
- "interval '10' second and " +
+ "t1.c >= t2.c - interval '1' second and " +
+ "t1.c <= t2.c + interval '10' second",
+ -1000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c - interval '2' second >= t2.c + interval '1' second - interval '10' second and " +
"t1.c <= t2.c + interval '10' second",
-7000,
10000,
"rowtime")
verifyTimeBoundary(
+ "t2.c + interval '1' second - interval '10' second <= t1.c - interval '2' second and " +
+ "t2.c + interval '10' second >= t1.c",
+ -7000,
+ 10000,
+ "rowtime")
+
+ verifyTimeBoundary(
"t1.c >= t2.c - interval '10' second and " +
"t1.c <= t2.c - interval '5' second",
-10000,
@@ -394,6 +407,27 @@ class JoinTest extends TableTestBase {
"rowtime")
verifyTimeBoundary(
+ "t2.c - interval '10' second <= t1.c and " +
+ "t2.c - interval '5' second >= t1.c",
+ -10000,
+ -5000,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t1.c > t2.c - interval '2' second and " +
+ "t1.c < t2.c + interval '2' second",
+ -1999,
+ 1999,
+ "rowtime")
+
+ verifyTimeBoundary(
+ "t2.c > t1.c - interval '2' second and " +
+ "t2.c < t1.c + interval '2' second",
+ -1999,
+ 1999,
+ "rowtime")
+
+ verifyTimeBoundary(
"t1.c = t2.c",
0,
0,
[3/5] flink git commit: [FLINK-7996] [table] Add support for
(left.time = right.time) predicates to window join.
Posted by fh...@apache.org.
[FLINK-7996] [table] Add support for (left.time = right.time) predicates to window join.
This closes #4977.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b20f70d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b20f70d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b20f70d
Branch: refs/heads/release-1.4
Commit: 1b20f70dea3fddfaeaf00ceae44e4dc0fcb4f47b
Parents: 8c60f97
Author: Xingcan Cui <xi...@gmail.com>
Authored: Wed Nov 8 01:17:57 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:43:51 2017 +0100
----------------------------------------------------------------------
docs/dev/table/sql.md | 17 ++--
docs/dev/table/tableApi.md | 26 +++---
.../table/runtime/join/WindowJoinUtil.scala | 95 ++++++++++++--------
.../flink/table/api/stream/sql/JoinTest.scala | 66 ++++++++++++++
.../sql/validation/JoinValidationTest.scala | 29 ++++++
.../flink/table/api/stream/table/JoinTest.scala | 34 +++++++
.../table/runtime/stream/sql/JoinITCase.scala | 47 ++++++++++
7 files changed, 259 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 2318271..3097d9e 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -400,14 +400,15 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
<td>
<p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
- <p>A time-windowed join requires at least one equi-join predicate and a special join
- condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
- <ul>
- <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
- <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
- </ul>
- </p>
-
+ <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>), a <code>BETWEEN</code> predicate, or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p>
+ <p>For example, the following predicates are valid window join conditions:</p>
+
+ <ul>
+ <li><code>ltime = rtime</code></li>
+ <li><code>ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE</code></li>
+ <li><code>ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND</code></li>
+ </ul>
+
<p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight sql %}
http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 7cce042..f5a2059 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -527,13 +527,14 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
<td>
<p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
- <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
- <ul>
- <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
- <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
- </ul>
- </p>
+ <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p>
+ <p>For example, the following predicates are valid window join conditions:</p>
+ <ul>
+ <li><code>ltime === rtime</code></li>
+ <li><code>ltime >= rtime && ltime < rtime + 10.minutes</code></li>
+ </ul>
+
<p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight java %}
@@ -644,13 +645,14 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
<td>
<p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
- <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
- <ul>
- <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
- <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
- </ul>
- </p>
+ <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p>
+ <p>For example, the following predicates are valid window join conditions:</p>
+ <ul>
+ <li><code>'ltime === 'rtime</code></li>
+ <li><code>'ltime >= 'rtime && 'ltime < 'rtime + 10.minutes</code></li>
+ </ul>
+
<p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight scala %}
http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 863f342..1693c41 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -78,29 +78,45 @@ object WindowJoinUtil {
// Converts the condition to conjunctive normal form (CNF)
val cnfCondition = RexUtil.toCnf(rexBuilder, predicate)
- // split the condition into time indicator condition and other condition
+ // split the condition into time predicates and other predicates
+ // We need two range predicates or an equality predicate for a properly bounded window join.
val (timePreds, otherPreds) = cnfCondition match {
- // We need at least two comparison predicates for a properly bounded window join.
- // So we need an AND expression for a valid window join.
- case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
- c.getOperands.asScala
- .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
- .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
- analyzed match {
- case Left(timePred) => (preds._1 :+ timePred, preds._2)
- case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
- }
- })
- case _ =>
- // No valid window bounds. A windowed stream join requires two comparison predicates that
- // bound the time in both directions.
- return (None, Some(predicate))
+ case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+ // extract all time predicates from conjunctive predicate
+ c.getOperands.asScala
+ .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
+ .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
+ analyzed match {
+ case Left(timePred) => (preds._1 :+ timePred, preds._2)
+ case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
+ }
+ })
+ case c: RexCall =>
+ // extract time predicate if it exists
+ identifyTimePredicate(c, leftLogicalFieldCnt, inputSchema) match {
+ case Left(timePred) => (Seq[TimePredicate](timePred), Seq[RexNode]())
+ case Right(otherPred) => (Seq[TimePredicate](), Seq[RexNode](otherPred))
+ }
+ case _ =>
+ // No valid window bounds.
+ return (None, Some(predicate))
}
- if (timePreds.size != 2) {
- // No valid window bounds. A windowed stream join requires two comparison predicates that
- // bound the time in both directions.
- return (None, Some(predicate))
+ timePreds match {
+ case Seq() =>
+ return (None, Some(predicate))
+ case Seq(t) if t.pred.getKind != SqlKind.EQUALS =>
+ // single predicate must be equality predicate
+ return (None, Some(predicate))
+ case s@Seq(_, _) if s.exists(_.pred.getKind == SqlKind.EQUALS) =>
+ // pair of range predicate must not include equals predicate
+ return (None, Some(predicate))
+ case Seq(_) =>
+ // Single equality predicate is OK
+ case Seq(_, _) =>
+ // Two range (i.e., non-equality predicates are OK
+ case _ =>
+ return (None, Some(predicate))
}
// assemble window bounds from predicates
@@ -108,9 +124,14 @@ object WindowJoinUtil {
val (leftLowerBound, leftUpperBound) =
streamTimeOffsets match {
case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
+ // two range predicates
(x.bound, y.bound)
case Seq(Some(x: WindowBound), Some(y: WindowBound)) if !x.isLeftLower && y.isLeftLower =>
+ // two range predicates
(y.bound, x.bound)
+ case Seq(Some(x: WindowBound)) =>
+ // single equality predicate
+ (x.bound, x.bound)
case _ =>
// Window join requires two comparison predicate that bound the time in both directions.
return (None, Some(predicate))
@@ -118,12 +139,12 @@ object WindowJoinUtil {
// compose the remain condition list into one condition
val remainCondition =
- otherPreds match {
- case Seq() =>
- None
- case _ =>
- Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
- }
+ otherPreds match {
+ case Seq() =>
+ None
+ case _ =>
+ Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
+ }
val bounds = if (timePreds.head.leftInputOnLeftSide) {
Some(WindowBounds(
@@ -146,14 +167,15 @@ object WindowJoinUtil {
/**
* Analyzes a predicate and identifies whether it is a valid predicate for a window join.
- * A valid window join predicate is a comparison predicate (<, <=, =>, >) that accesses
- * time attributes of both inputs, each input on a different side of the condition.
+ *
+ * A valid window join predicate is a range or equality predicate (<, <=, ==, =>, >) that
+ * accesses time attributes of both inputs, each input on a different side of the condition.
* Both accessed time attributes must be of the same time type, i.e., row-time or proc-time.
*
* Examples:
* - left.rowtime > right.rowtime + 2.minutes => valid
+ * - left.rowtime == right.rowtime => valid
* - left.proctime < right.rowtime + 2.minutes => invalid: different time type
- * - left.rowtime == right.rowtime + 2.minutes => invalid: not a comparison predicate
* - left.rowtime - right.rowtime < 2.minutes => invalid: both time attributes on same side
*
* If the predicate is a regular join predicate, i.e., it accesses no time attribute it is
@@ -172,7 +194,8 @@ object WindowJoinUtil {
case SqlKind.GREATER_THAN |
SqlKind.GREATER_THAN_OR_EQUAL |
SqlKind.LESS_THAN |
- SqlKind.LESS_THAN_OR_EQUAL =>
+ SqlKind.LESS_THAN_OR_EQUAL |
+ SqlKind.EQUALS =>
val leftTerm = c.getOperands.get(0)
val rightTerm = c.getOperands.get(1)
@@ -235,7 +258,7 @@ object WindowJoinUtil {
*
* @return A Seq of all time attribute accessed in the expression.
*/
- def extractTimeAttributeAccesses(
+ private def extractTimeAttributeAccesses(
expr: RexNode,
leftFieldCount: Int,
inputType: RelDataType): Seq[TimeAttributeAccess] = {
@@ -248,9 +271,9 @@ object WindowJoinUtil {
case t: TimeIndicatorRelDataType =>
// time attribute access. Remember time type and side of input
if (idx < leftFieldCount) {
- Seq(TimeAttributeAccess(t.isEventTime, true, idx))
+ Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = true, idx))
} else {
- Seq(TimeAttributeAccess(t.isEventTime, false, idx - leftFieldCount))
+ Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = false, idx - leftFieldCount))
}
case _ =>
// not a time attribute access.
@@ -272,7 +295,7 @@ object WindowJoinUtil {
* @param inputType The input type of the expression.
* @return True, if the expression accesses a non-time attribute. False otherwise.
*/
- def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+ private def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
expr match {
case i: RexInputRef =>
val accessedType = inputType.getFieldList.get(i.getIndex).getType
@@ -292,7 +315,7 @@ object WindowJoinUtil {
*
* @return window boundary, is left lower bound
*/
- def computeWindowBoundFromPredicate(
+ private def computeWindowBoundFromPredicate(
timePred: TimePredicate,
rexBuilder: RexBuilder,
config: TableConfig): Option[WindowBound] = {
@@ -303,6 +326,8 @@ object WindowJoinUtil {
timePred.leftInputOnLeftSide
case (SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
!timePred.leftInputOnLeftSide
+ case (SqlKind.EQUALS) =>
+ true // We don't care about this since there's only one bound value.
case _ =>
return None
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 53aff82..ded5c51 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -185,6 +185,66 @@ class JoinTest extends TableTestBase {
}
@Test
+ def testJoinWithEquiProcTime(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime = t2.proctime
+ |""".stripMargin
+
+ val expected =
+ unaryNode("DataStreamCalc",
+ binaryNode("DataStreamWindowJoin",
+ unaryNode("DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "proctime")
+ ),
+ unaryNode("DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "proctime")
+ ),
+ term("where", "AND(=(a, a0), =(proctime, proctime0))"),
+ term("join", "a", "proctime", "a0", "b", "proctime0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b0 AS b")
+ )
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testJoinWithEquiRowTime(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.c = t2.c
+ |""".stripMargin
+
+ val expected =
+ unaryNode("DataStreamCalc",
+ binaryNode("DataStreamWindowJoin",
+ unaryNode("DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c")
+ ),
+ unaryNode("DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "c")
+ ),
+ term("where", "AND(=(a, a0), =(c, c0))"),
+ term("join", "a", "c", "a0", "b", "c0"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "b0 AS b")
+ )
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {
val sqlQuery =
@@ -332,6 +392,12 @@ class JoinTest extends TableTestBase {
-10000,
-5000,
"rowtime")
+
+ verifyTimeBoundary(
+ "t1.c = t2.c",
+ 0,
+ 0,
+ "rowtime")
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
index 9cce37e..9f7078c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
@@ -92,4 +92,33 @@ class JoinValidationTest extends TableTestBase {
streamUtil.verifySql(sql, "n/a")
}
+ /** Validates that range and equality predicate are not accepted **/
+ @Test(expected = classOf[TableException])
+ def testRangeAndEqualityPredicates(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime > t2.proctime - INTERVAL '5' SECOND AND
+ | t1.proctime = t2.proctime
+ | """.stripMargin
+
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ /** Validates that equality predicate with offset are not accepted **/
+ @Test(expected = classOf[TableException])
+ def testEqualityPredicateWithOffset(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable t1, MyTable2 t2
+ |WHERE t1.a = t2.a AND
+ | t1.proctime = t2.proctime - INTERVAL '5' SECOND
+ | """.stripMargin
+
+ streamUtil.verifySql(sql, "n/a")
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
index 07e879f..79b413c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
@@ -100,6 +100,40 @@ class JoinTest extends TableTestBase {
util.verifyTable(resultTable, expected)
}
+ @Test
+ def testProcTimeWindowInnerJoinWithEquiTimeAttrs(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime)
+ val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.proctime)
+
+ val resultTable = left.join(right)
+ .where('a === 'd && 'ltime === 'rtime)
+ .select('a, 'e, 'ltime)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamWindowJoin",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "ltime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "d", "e", "rtime")
+ ),
+ term("where", "AND(=(a, d), =(ltime, rtime))"),
+ term("join", "a", "ltime", "d", "e", "rtime"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a", "e", "PROCTIME(ltime) AS ltime")
+ )
+ util.verifyTable(resultTable, expected)
+ }
+
/**
* The time indicator can be accessed from non-time predicates now.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 1d7bab6..85929e8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -187,6 +187,53 @@ class JoinITCase extends StreamingWithStateTestBase {
StreamITCase.compareWithList(expected)
}
+ /** test rowtime inner join with equi-times **/
+ @Test
+ def testRowTimeInnerJoinWithEquiTimeAttrs(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ StreamITCase.clear
+
+ val sqlQuery =
+ """
+ |SELECT t2.key, t2.id, t1.id
+ |FROM T1 as t1 join T2 as t2 ON
+ | t1.key = t2.key AND
+ | t2.rt = t1.rt
+ |""".stripMargin
+
+ val data1 = new mutable.MutableList[(Int, Long, String, Long)]
+
+ data1.+=((4, 4000L, "A", 4000L))
+ data1.+=((5, 5000L, "A", 5000L))
+ data1.+=((6, 6000L, "A", 6000L))
+ data1.+=((6, 6000L, "B", 6000L))
+
+ val data2 = new mutable.MutableList[(String, String, Long)]
+ data2.+=(("A", "R-5", 5000L))
+ data2.+=(("B", "R-6", 6000L))
+
+ val t1 = env.fromCollection(data1)
+ .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
+ .toTable(tEnv, 'id, 'tm, 'key, 'rt.rowtime)
+ val t2 = env.fromCollection(data2)
+ .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+ .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ val expected = new java.util.ArrayList[String]
+ expected.add("A,R-5,5")
+ expected.add("B,R-6,6")
+ StreamITCase.compareWithList(expected)
+ }
+
/** test rowtime inner join with other conditions **/
@Test
def testRowTimeInnerJoinWithOtherConditions(): Unit = {