You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 12:16:32 UTC
[flink] 01/04: [FLINK-13287][table-planner] Support
STREAM_RECORD_TIMESTAMP call in table planner
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 678fce06587a3df4693acbcc5f3a9fb32396aabe
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 17 20:18:56 2019 +0800
[FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table planner
---
.../java/org/apache/flink/table/expressions/RexNodeConverter.java | 2 ++
.../apache/flink/table/expressions/PlannerExpressionConverter.scala | 4 ++++
.../apache/flink/table/expressions/PlannerExpressionConverter.scala | 4 ++++
3 files changed, 10 insertions(+)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
index 2c6ef4d..5528571 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
@@ -289,6 +289,8 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
conversionsOfBuiltInFunc
.put(BuiltInFunctionDefinitions.SHA512, exprs -> convert(FlinkSqlOperatorTable.SHA512, exprs));
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
+ conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP,
+ exprs -> convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs));
}
@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index d52d6e6a..208cad9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -683,6 +683,10 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
assert(args.isEmpty)
CurrentRow()
+ case STREAM_RECORD_TIMESTAMP =>
+ assert(args.isEmpty)
+ StreamRecordTimestamp()
+
case _ =>
throw new TableException(s"Unsupported function definition: $fd")
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 999fa56..5684594 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -682,6 +682,10 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
assert(args.isEmpty)
CurrentRow()
+ case STREAM_RECORD_TIMESTAMP =>
+ assert(args.isEmpty)
+ StreamRecordTimestamp()
+
case _ =>
throw new TableException(s"Unsupported function definition: $fd")
}