You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 12:16:32 UTC

[flink] 01/04: [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table planner

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 678fce06587a3df4693acbcc5f3a9fb32396aabe
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 17 20:18:56 2019 +0800

    [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table planner
---
 .../java/org/apache/flink/table/expressions/RexNodeConverter.java     | 2 ++
 .../apache/flink/table/expressions/PlannerExpressionConverter.scala   | 4 ++++
 .../apache/flink/table/expressions/PlannerExpressionConverter.scala   | 4 ++++
 3 files changed, 10 insertions(+)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
index 2c6ef4d..5528571 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
@@ -289,6 +289,8 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 		conversionsOfBuiltInFunc
 				.put(BuiltInFunctionDefinitions.SHA512, exprs -> convert(FlinkSqlOperatorTable.SHA512, exprs));
 		conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
+		conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP,
+				exprs -> convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs));
 	}
 
 	@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index d52d6e6a..208cad9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -683,6 +683,10 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
             assert(args.isEmpty)
             CurrentRow()
 
+          case STREAM_RECORD_TIMESTAMP =>
+            assert(args.isEmpty)
+            StreamRecordTimestamp()
+
           case _ =>
             throw new TableException(s"Unsupported function definition: $fd")
         }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 999fa56..5684594 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -682,6 +682,10 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
             assert(args.isEmpty)
             CurrentRow()
 
+          case STREAM_RECORD_TIMESTAMP =>
+            assert(args.isEmpty)
+            StreamRecordTimestamp()
+
           case _ =>
             throw new TableException(s"Unsupported function definition: $fd")
         }