You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/29 14:06:18 UTC

flink git commit: [FLINK-6736] [table] Fix UDTF field access with time attribute record

Repository: flink
Updated Branches:
  refs/heads/master 6b69c588d -> 64ca0fa22


[FLINK-6736] [table] Fix UDTF field access with time attribute record

This closes #4008.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64ca0fa2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64ca0fa2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64ca0fa2

Branch: refs/heads/master
Commit: 64ca0fa22ac0af21b3a0a193b769a24d44947f18
Parents: 6b69c58
Author: twalthr <tw...@apache.org>
Authored: Mon May 29 14:48:14 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon May 29 16:05:44 2017 +0200

----------------------------------------------------------------------
 .../table/plan/nodes/CommonCorrelate.scala      |  4 +-
 .../calcite/RelTimeIndicatorConverterTest.scala |  8 ++--
 .../datastream/TimeAttributesITCase.scala       | 47 ++++++++++++++++----
 3 files changed, 46 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64ca0fa2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 874bea2..96e1f5e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -53,6 +53,8 @@ trait CommonCorrelate {
     functionClass: Class[T]):
   GeneratedFunction[T, Row] = {
 
+    val physicalRexCall = inputSchema.mapRexNode(rexCall)
+
     val functionGenerator = new CodeGenerator(
       config,
       false,
@@ -67,7 +69,7 @@ trait CommonCorrelate {
       .addReusableConstructor(classOf[TableFunctionCollector[_]])
       .head
 
-    val call = functionGenerator.generateExpression(rexCall)
+    val call = functionGenerator.generateExpression(physicalRexCall)
     var body =
       s"""
          |${call.resultTerm}.setCollector($collectorTerm);

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca0fa2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
index 8963ee2..c315d94 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
@@ -201,7 +201,7 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
     val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
     val func = new TableFunc
 
-    val result = t.join(func('rowtime, 'proctime) as 's).select('rowtime, 'proctime, 's)
+    val result = t.join(func('rowtime, 'proctime, "") as 's).select('rowtime, 'proctime, 's)
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -209,7 +209,7 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation",
-          s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3))"),
+          s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3), '')"),
         term("function", func),
         term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
           "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
@@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest {
 
   class TableFunc extends TableFunction[String] {
     val t = new Timestamp(0L)
-    def eval(time1: Long, time2: Timestamp): Unit = {
-      collect(time1.toString + time2.after(t))
+    def eval(time1: Long, time2: Timestamp, string: String): Unit = {
+      collect(time1.toString + time2.after(t) + string)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca0fa2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index f2d6175..c25dfdf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -159,20 +159,51 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime)
     val func = new TableFunc
 
-    val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's)
+    val t = table.join(func('rowtime, 'proctime, 'string) as 's).select('rowtime, 's)
 
     val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
     val expected = Seq(
-      "1970-01-01 00:00:00.001,1true",
-      "1970-01-01 00:00:00.002,2true",
-      "1970-01-01 00:00:00.003,3true",
-      "1970-01-01 00:00:00.004,4true",
-      "1970-01-01 00:00:00.007,7true",
-      "1970-01-01 00:00:00.008,8true",
-      "1970-01-01 00:00:00.016,16true")
+      "1970-01-01 00:00:00.001,1trueHi",
+      "1970-01-01 00:00:00.002,2trueHallo",
+      "1970-01-01 00:00:00.003,3trueHello",
+      "1970-01-01 00:00:00.004,4trueHello",
+      "1970-01-01 00:00:00.007,7trueHello",
+      "1970-01-01 00:00:00.008,8trueHello world",
+      "1970-01-01 00:00:00.016,16trueHello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testWindowAfterTableFunction(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(
+      tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime)
+    val func = new TableFunc
+
+    val t = table
+      .join(func('rowtime, 'proctime, 'string) as 's)
+      .window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w)
+      .select('w.rowtime, 's.count)
+
+    val results = t.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.004,4",
+      "1970-01-01 00:00:00.009,2",
+      "1970-01-01 00:00:00.019,1")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }