You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/29 14:06:18 UTC
flink git commit: [FLINK-6736] [table] Fix UDTF field access with
time attribute record
Repository: flink
Updated Branches:
refs/heads/master 6b69c588d -> 64ca0fa22
[FLINK-6736] [table] Fix UDTF field access with time attribute record
This closes #4008.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64ca0fa2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64ca0fa2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64ca0fa2
Branch: refs/heads/master
Commit: 64ca0fa22ac0af21b3a0a193b769a24d44947f18
Parents: 6b69c58
Author: twalthr <tw...@apache.org>
Authored: Mon May 29 14:48:14 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon May 29 16:05:44 2017 +0200
----------------------------------------------------------------------
.../table/plan/nodes/CommonCorrelate.scala | 4 +-
.../calcite/RelTimeIndicatorConverterTest.scala | 8 ++--
.../datastream/TimeAttributesITCase.scala | 47 ++++++++++++++++----
3 files changed, 46 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca0fa2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 874bea2..96e1f5e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -53,6 +53,8 @@ trait CommonCorrelate {
functionClass: Class[T]):
GeneratedFunction[T, Row] = {
+ val physicalRexCall = inputSchema.mapRexNode(rexCall)
+
val functionGenerator = new CodeGenerator(
config,
false,
@@ -67,7 +69,7 @@ trait CommonCorrelate {
.addReusableConstructor(classOf[TableFunctionCollector[_]])
.head
- val call = functionGenerator.generateExpression(rexCall)
+ val call = functionGenerator.generateExpression(physicalRexCall)
var body =
s"""
|${call.resultTerm}.setCollector($collectorTerm);
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca0fa2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
index 8963ee2..c315d94 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
@@ -201,7 +201,7 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
val func = new TableFunc
- val result = t.join(func('rowtime, 'proctime) as 's).select('rowtime, 'proctime, 's)
+ val result = t.join(func('rowtime, 'proctime, "") as 's).select('rowtime, 'proctime, 's)
val expected = unaryNode(
"DataStreamCalc",
@@ -209,7 +209,7 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
"DataStreamCorrelate",
streamTableNode(0),
term("invocation",
- s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3))"),
+ s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3), '')"),
term("function", func),
term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
"TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
@@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest {
class TableFunc extends TableFunction[String] {
val t = new Timestamp(0L)
- def eval(time1: Long, time2: Timestamp): Unit = {
- collect(time1.toString + time2.after(t))
+ def eval(time1: Long, time2: Timestamp, string: String): Unit = {
+ collect(time1.toString + time2.after(t) + string)
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca0fa2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index f2d6175..c25dfdf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -159,20 +159,51 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime)
val func = new TableFunc
- val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's)
+ val t = table.join(func('rowtime, 'proctime, 'string) as 's).select('rowtime, 's)
val results = t.toAppendStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
val expected = Seq(
- "1970-01-01 00:00:00.001,1true",
- "1970-01-01 00:00:00.002,2true",
- "1970-01-01 00:00:00.003,3true",
- "1970-01-01 00:00:00.004,4true",
- "1970-01-01 00:00:00.007,7true",
- "1970-01-01 00:00:00.008,8true",
- "1970-01-01 00:00:00.016,16true")
+ "1970-01-01 00:00:00.001,1trueHi",
+ "1970-01-01 00:00:00.002,2trueHallo",
+ "1970-01-01 00:00:00.003,3trueHello",
+ "1970-01-01 00:00:00.004,4trueHello",
+ "1970-01-01 00:00:00.007,7trueHello",
+ "1970-01-01 00:00:00.008,8trueHello world",
+ "1970-01-01 00:00:00.016,16trueHello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testWindowAfterTableFunction(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(
+ tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime)
+ val func = new TableFunc
+
+ val t = table
+ .join(func('rowtime, 'proctime, 'string) as 's)
+ .window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w)
+ .select('w.rowtime, 's.count)
+
+ val results = t.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.004,4",
+ "1970-01-01 00:00:00.009,2",
+ "1970-01-01 00:00:00.019,1")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}